diff mbox series

[v2] Execute subprocess IPC in separate thread

Message ID 20201213022130.42375-1-sava.jakovljev@teufel.de
State Changes Requested
Headers show
Series [v2] Execute subprocess IPC in separate thread | expand

Commit Message

Sava Jakovljev Dec. 13, 2020, 2:21 a.m. UTC
Signed-off-by: Sava Jakovljev <sava.jakovljev@teufel.de>
---
Subprocesses may want to alter global SWUpdate state.
In order to do this, subprocess RPC must be handled with network thread
being kept free, in order to avoid deadlocks. One currently exists in
Suricatta activation IPC.

Changes from v1 version of this patch are to correctly handle OOM situation
by sending a NACK message back, and to declare subprocess thread functions 
static.

 core/network_thread.c | 217 ++++++++++++++++++++++++++++--------------
 1 file changed, 145 insertions(+), 72 deletions(-)

--
2.26.2

Comments

Stefano Babic Dec. 20, 2020, 11:23 a.m. UTC | #1
Hi Sava,

I have a few points to be clarified:

On 13.12.20 03:21, Sava Jakovljev wrote:
> Signed-off-by: Sava Jakovljev <sava.jakovljev@teufel.de>
> ---
> Subprocesses may want to alter global SWUpdate state.
> In order to do this, subprocess RPC must be handled with network thread
> being kept free, in order to avoid deadlocks. One currently exists in
> Suricatta activation IPC.
> 
> Changes from v1 version of this patch are to correctly handle OOM situation
> by sending a NACK message back, and to declare subprocess thread functions
> static.
> 
>   core/network_thread.c | 217 ++++++++++++++++++++++++++++--------------
>   1 file changed, 145 insertions(+), 72 deletions(-)
> 
> diff --git a/core/network_thread.c b/core/network_thread.c
> index 0e6080d..64be58f 100644
> --- a/core/network_thread.c
> +++ b/core/network_thread.c
> @@ -56,6 +56,20 @@ static unsigned long nrmsgs = 0;
> 
>   static pthread_mutex_t msglock = PTHREAD_MUTEX_INITIALIZER;
> 
> +struct subprocess_msg_elem {
> +	ipc_message message;
> +	int client;
> +	SIMPLEQ_ENTRY(subprocess_msg_elem) next;
> +};
> +
> +SIMPLEQ_HEAD(subprocess_msglist, subprocess_msg_elem);
> +static struct subprocess_msglist subprocess_messages;
> +
> +static pthread_t subprocess_ipc_handler_thread_id;
> +static pthread_mutex_t subprocess_msg_lock = PTHREAD_MUTEX_INITIALIZER;
> +static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER;
> +static unsigned long subprocess_messages_count = 0;
> +
>   static bool is_selection_allowed(const char *software_set, char *running_mode,
>   				 struct dict const *acceptedlist)
>   {
> @@ -239,6 +253,110 @@ static void unlink_socket(void)
>   	unlink(get_ctrl_socket());
>   }
> 
> +static void send_subprocess_reply(struct subprocess_msg_elem *subprocess_msg)
> +{
> +	if (write(subprocess_msg->client, &subprocess_msg->message,
> +			sizeof(subprocess_msg->message)) < 0)
> +		printf("Error write on socket ctrl");

printf() ?

Everything flows into SWUpdate's notification via TRACE/DEBUG/ERROR/..., 
printf() should be avoided.

> +}
> +
> +static void handle_subprocess_ipc(struct subprocess_msg_elem *subprocess_msg)
> +{
> +	ipc_message *msg = &subprocess_msg->message;
> +	int pipe = pctl_getfd_from_type(msg->data.procmsg.source);
> +	if (pipe < 0) {
> +		ERROR("Cannot find channel for requested process");
> +		msg->type = NACK;
> +
> +		return;
> +	}
> +
> +	TRACE("Received Message for %s",
> +		pctl_getname_from_type(msg->data.procmsg.source));
> +	if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
> +		ERROR("Pipe not available or closed: %d", pipe);
> +		msg->type = NACK;
> +
> +		return;
> +	}
> +
> +	/*
> +	 * Cleanup the queue to be sure there are not
> +	 * outstanding messages
> +	 */
> +	empty_pipe(pipe);
> +
> +	int ret = write(pipe, msg, sizeof(*msg));
> +	if (ret != sizeof(*msg)) {
> +		ERROR("Writing to pipe failed !");
> +		msg->type = NACK;
> +
> +		return;
> +	}
> +
> +	/*
> +	 * Do not block forever for an answer
> +	 * This would block the whole thread
> +	 * If a message requires more time,
> +	 * the destination process should sent an
> +	 * answer back explaining this in the payload
> +	 */
> +	fd_set pipefds;
> +	FD_ZERO(&pipefds);
> +	FD_SET(pipe, &pipefds);
> +
> +	struct timeval tv;
> +	tv.tv_usec = 0;
> +	if (!msg->data.procmsg.timeout)
> +		tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
> +	else
> +		tv.tv_sec = msg->data.procmsg.timeout;
> +	ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
> +
> +	/*
> +	 * If there is an error or timeout,
> +	 * send a NACK back
> +	 */
> +	if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
> +		msg->type = NACK;
> +
> +		return;
> +	}
> +
> +	ret = read(pipe, msg, sizeof(*msg));
> +	if (ret != sizeof(*msg)) {
> +		ERROR("Reading from pipe failed !");
> +		msg->type = NACK;
> +	}
> +}

This code is just moved into a separate function (this makes the code 
easier to understand, too). I guess there is no difference.

> +
> +static void *subprocess_thread (void *data)
> +{
> +	(void)data;
> +	pthread_mutex_lock(&subprocess_msg_lock);
> +

I am not sure about the synchronization, and if there is not a race at 
startup.

This thread is started and gets the mutex. The parent thread is going 
on. If this thread has started in between, it locks the mutex.

Later, the parent thread has:

+pthread_mutex_lock(&subprocess_msg_lock);
+				++subprocess_messages_count;
+SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
+				pthread_cond_signal(&subprocess_wkup);
+pthread_mutex_unlock(&subprocess_msg_lock);

but if  handle_subprocess_ipc() was scheduled once, it gets the mutex 
and it is waiting on the conditional variable. When the parent is trying 
to get the mutex, it is blocked and it cannot release the conditional 
variable. Am I wrong ?

> +	while(1) {
> +		pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
> +
> +		while(subprocess_messages_count) {
> +			struct subprocess_msg_elem *subprocess_msg;
> +			subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
> +			SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);
> +			--subprocess_messages_count;
> +			pthread_mutex_unlock(&subprocess_msg_lock);
> +
> +			handle_subprocess_ipc(subprocess_msg);
> +			send_subprocess_reply(subprocess_msg);
> +			close(subprocess_msg->client);
> +
> +			free(subprocess_msg);
> +			pthread_mutex_lock(&subprocess_msg_lock);
> +		}
> +	}
> +
> +	return NULL;
> +}
> +
>   void *network_thread (void *data)
>   {
>   	struct installer *instp = (struct installer *)data;
> @@ -249,17 +367,18 @@ void *network_thread (void *data)
>   	int nread;
>   	struct msg_elem *notification;
>   	int ret;
> -	int pipe;
> -	fd_set pipefds;
> -	struct timeval tv;
>   	update_state_t value;
> +	struct subprocess_msg_elem *subprocess_msg;
> 
>   	if (!instp) {
>   		TRACE("Fatal error: Network thread aborting...");
>   		return (void *)0;
>   	}
> 
> +	subprocess_ipc_handler_thread_id = start_thread(subprocess_thread, NULL);
> +
>   	SIMPLEQ_INIT(&notifymsgs);
> +	SIMPLEQ_INIT(&subprocess_messages);
>   	register_notifier(network_notifier);
> 
>   	/* Initialize and bind to UDS */
> @@ -287,7 +406,8 @@ void *network_thread (void *data)
>   		nread = read(ctrlconnfd, (void *)&msg, sizeof(msg));
> 
>   		if (nread != sizeof(msg)) {
> -			TRACE("IPC message too short: fragmentation not supported");
> +			TRACE("IPC message too short: fragmentation not supported (read %d bytes)",
> +				nread);
>   			close(ctrlconnfd);
>   			continue;
>   		}
> @@ -309,72 +429,22 @@ void *network_thread (void *data)
>   				}
>   				break;
>   			case SWUPDATE_SUBPROCESS:
> -				/*
> -				 *  this request is not for the installer,
> -				 *  but for one of the subprocesses
> -				 *  forward the request without checking
> -				 *  the payload
> -				 */
> -
> -				pipe = pctl_getfd_from_type(msg.data.procmsg.source);
> -				if (pipe < 0) {
> -					ERROR("Cannot find channel for requested process");
> -					msg.type = NACK;
> -					break;
> -				}
> -				TRACE("Received Message for %s",
> -					pctl_getname_from_type(msg.data.procmsg.source));
> -				if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
> -					ERROR("Pipe not available or closed: %d", pipe);
> +				subprocess_msg = (struct subprocess_msg_elem*)malloc(
> +						sizeof(struct subprocess_msg_elem));
> +				if (subprocess_msg == NULL) {
> +					ERROR("Cannot handle subprocess IPC because of OOM.");
>   					msg.type = NACK;
>   					break;
>   				}
> 
> -				/*
> -				 * Cleanup the queue to be sure there are not
> -				 * outstanding messages
> -				 */
> -				empty_pipe(pipe);
> -
> -				ret = write(pipe, &msg, sizeof(msg));
> -				if (ret != sizeof(msg)) {
> -					ERROR("Writing to pipe failed !");
> -					msg.type = NACK;
> -					break;
> -				}
> -
> -				/*
> -				 * Do not block forever for an answer
> -				 * This would block the whole thread
> -				 * If a message requires more time,
> -				 * the destination process should sent an
> -				 * answer back explaining this in the payload
> -				 */
> -				FD_ZERO(&pipefds);
> -				FD_SET(pipe, &pipefds);
> -				tv.tv_usec = 0;
> -				if (!msg.data.procmsg.timeout)
> -					tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
> -				else
> -					tv.tv_sec = msg.data.procmsg.timeout;
> -				ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
> -
> -				/*
> -				 * If there is an error or timeout,
> -				 * send a NACK back
> -				 */
> -				if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
> -					msg.type = NACK;
> -					break;
> -				}
> -
> -				ret = read(pipe, &msg, sizeof(msg));
> -				if (ret != sizeof(msg)) {
> -					ERROR("Reading from pipe failed !");
> -					msg.type = NACK;
> -					break;
> -				}
> +				subprocess_msg->client = ctrlconnfd;
> +				subprocess_msg->message = msg;
> 
> +				pthread_mutex_lock(&subprocess_msg_lock);
> +				++subprocess_messages_count;
> +				SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
> +				pthread_cond_signal(&subprocess_wkup);
> +				pthread_mutex_unlock(&subprocess_msg_lock);
>   				/*
>   				 * ACK/NACK was inserted by the called SUBPROCESS
>   				 * It should not be touched here
> @@ -453,12 +523,15 @@ void *network_thread (void *data)
>   			msg.type = NACK;
>   			sprintf(msg.data.msg, "Wrong request: aborting");
>   		}
> -		ret = write(ctrlconnfd, &msg, sizeof(msg));
> -		if (ret < 0)
> -			printf("Error write on socket ctrl");
> 
> -		if (msg.type != ACK)
> -			close(ctrlconnfd);
> +		if (msg.type == ACK || msg.type == NACK || msg.type == GET_STATUS) {
> +			ret = write(ctrlconnfd, &msg, sizeof(msg));
> +			if (ret < 0)
> +				printf("Error write on socket ctrl");
> +
> +			if (msg.type != ACK)
> +				close(ctrlconnfd);
> +		}
>   		pthread_mutex_unlock(&stream_mutex);
>   	} while (1);
>   	return (void *)0;
> --
> 2.26.2
> 

Best regards,
Stefano Babic
Sava Jakovljev Dec. 20, 2020, 6:44 p.m. UTC | #2
Hello Stefano,

Thank you for your review and your comments.

Regarding printf's, I just copied the code from networking thread, where 
printf is also used - but you're absolutely right, I will change printf's 
to INFO/WARN/ERROR in every occurrence. 

Regarding the possible race condition, in order to ensure that everything 
is started up correctly, I can make the following changes:
1. call SIMPLEQ_INIT(&subprocess_messages) before creating the thread (very 
stupid mistake from my part, I'm sorry). This is in the current 
implementation not a problem since pthread_cond_wait is called first. Small 
possibility of error exists in case of spurious wakeup, and this definitely 
should be avoided, thus initializing the queue first is absolutely 
mandatory.
2. modify subprocess thread to be a bit more robust in the following way:
pthread_mutex_lock(&subprocess_msg_lock);

while(1) {
while(!SIMPLEQ_EMPTY(&subprocess_messages)) {
struct subprocess_msg_elem *subprocess_msg;
subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);

pthread_mutex_unlock(&subprocess_msg_lock);

handle_subprocess_ipc(subprocess_msg);
send_subprocess_reply(subprocess_msg);
close(subprocess_msg->client);

free(subprocess_msg);
pthread_mutex_lock(&subprocess_msg_lock);
}

pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
}

This will ensure that calling pthread_cond_wait will always be done in a 
correct way, even during start-up. This way, predicate is ensuring that 
everything handled correctly - does this approach look better for you?
Also, I noticed the existence of SIMPLEQ_EMPTY helper, thus there's no need 
for subprocess_messages_count variable. 
In case of normal start-up, pthread_cond_wait will unlock the mutex and 
data race scenarios are avoided.

Thank you.
Best regards,
Sava Jakovljev
Stefano Babic schrieb am Sonntag, 20. Dezember 2020 um 12:23:53 UTC+1:

> Hi Sava,
>
> I have a few points to be clarified:
>
> On 13.12.20 03:21, Sava Jakovljev wrote:
> > Signed-off-by: Sava Jakovljev <sava.ja...@teufel.de>
> > ---
> > Subprocesses may want to alter global SWUpdate state.
> > In order to do this, subprocess RPC must be handled with network thread
> > being kept free, in order to avoid deadlocks. One currently exists in
> > Suricatta activation IPC.
> > 
> > Changes from v1 version of this patch are to correctly handle OOM 
> situation
> > by sending a NACK message back, and to declare subprocess thread 
> functions
> > static.
> > 
> > core/network_thread.c | 217 ++++++++++++++++++++++++++++--------------
> > 1 file changed, 145 insertions(+), 72 deletions(-)
> > 
> > diff --git a/core/network_thread.c b/core/network_thread.c
> > index 0e6080d..64be58f 100644
> > --- a/core/network_thread.c
> > +++ b/core/network_thread.c
> > @@ -56,6 +56,20 @@ static unsigned long nrmsgs = 0;
> > 
> > static pthread_mutex_t msglock = PTHREAD_MUTEX_INITIALIZER;
> > 
> > +struct subprocess_msg_elem {
> > + ipc_message message;
> > + int client;
> > + SIMPLEQ_ENTRY(subprocess_msg_elem) next;
> > +};
> > +
> > +SIMPLEQ_HEAD(subprocess_msglist, subprocess_msg_elem);
> > +static struct subprocess_msglist subprocess_messages;
> > +
> > +static pthread_t subprocess_ipc_handler_thread_id;
> > +static pthread_mutex_t subprocess_msg_lock = PTHREAD_MUTEX_INITIALIZER;
> > +static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER;
> > +static unsigned long subprocess_messages_count = 0;
> > +
> > static bool is_selection_allowed(const char *software_set, char 
> *running_mode,
> > struct dict const *acceptedlist)
> > {
> > @@ -239,6 +253,110 @@ static void unlink_socket(void)
> > unlink(get_ctrl_socket());
> > }
> > 
> > +static void send_subprocess_reply(struct subprocess_msg_elem 
> *subprocess_msg)
> > +{
> > + if (write(subprocess_msg->client, &subprocess_msg->message,
> > + sizeof(subprocess_msg->message)) < 0)
> > + printf("Error write on socket ctrl");
>
> printf() ?
>
> Everything flows into SWUpdate's notification via TRACE/DEBUG/ERROR/..., 
> printf() should be avoided.
>
> > +}
> > +
> > +static void handle_subprocess_ipc(struct subprocess_msg_elem 
> *subprocess_msg)
> > +{
> > + ipc_message *msg = &subprocess_msg->message;
> > + int pipe = pctl_getfd_from_type(msg->data.procmsg.source);
> > + if (pipe < 0) {
> > + ERROR("Cannot find channel for requested process");
> > + msg->type = NACK;
> > +
> > + return;
> > + }
> > +
> > + TRACE("Received Message for %s",
> > + pctl_getname_from_type(msg->data.procmsg.source));
> > + if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
> > + ERROR("Pipe not available or closed: %d", pipe);
> > + msg->type = NACK;
> > +
> > + return;
> > + }
> > +
> > + /*
> > + * Cleanup the queue to be sure there are not
> > + * outstanding messages
> > + */
> > + empty_pipe(pipe);
> > +
> > + int ret = write(pipe, msg, sizeof(*msg));
> > + if (ret != sizeof(*msg)) {
> > + ERROR("Writing to pipe failed !");
> > + msg->type = NACK;
> > +
> > + return;
> > + }
> > +
> > + /*
> > + * Do not block forever for an answer
> > + * This would block the whole thread
> > + * If a message requires more time,
> > + * the destination process should sent an
> > + * answer back explaining this in the payload
> > + */
> > + fd_set pipefds;
> > + FD_ZERO(&pipefds);
> > + FD_SET(pipe, &pipefds);
> > +
> > + struct timeval tv;
> > + tv.tv_usec = 0;
> > + if (!msg->data.procmsg.timeout)
> > + tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
> > + else
> > + tv.tv_sec = msg->data.procmsg.timeout;
> > + ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
> > +
> > + /*
> > + * If there is an error or timeout,
> > + * send a NACK back
> > + */
> > + if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
> > + msg->type = NACK;
> > +
> > + return;
> > + }
> > +
> > + ret = read(pipe, msg, sizeof(*msg));
> > + if (ret != sizeof(*msg)) {
> > + ERROR("Reading from pipe failed !");
> > + msg->type = NACK;
> > + }
> > +}
>
> This code is just moved into a separate function (this makes the code 
> easier to understand, too). I guess there is no difference.
>
> > +
> > +static void *subprocess_thread (void *data)
> > +{
> > + (void)data;
> > + pthread_mutex_lock(&subprocess_msg_lock);
> > +
>
> I am not sure about the synchronization, and if there is not a race at 
> startup.
>
> This thread is started and gets the mutex. The parent thread is going 
> on. If this thread has started in between, it locks the mutex.
>
> Later, the parent thread has:
>
> +pthread_mutex_lock(&subprocess_msg_lock);
> + ++subprocess_messages_count;
> +SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
> + pthread_cond_signal(&subprocess_wkup);
> +pthread_mutex_unlock(&subprocess_msg_lock);
>
> but if handle_subprocess_ipc() was scheduled once, it gets the mutex 
> and it is waiting on the conditional variable. When the parent is trying 
> to get the mutex, it is blocked and it cannot release the conditional 
> variable. Am I wrong ?
>
> > + while(1) {
> > + pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
> > +
> > + while(subprocess_messages_count) {
> > + struct subprocess_msg_elem *subprocess_msg;
> > + subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
> > + SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);
> > + --subprocess_messages_count;
> > + pthread_mutex_unlock(&subprocess_msg_lock);
> > +
> > + handle_subprocess_ipc(subprocess_msg);
> > + send_subprocess_reply(subprocess_msg);
> > + close(subprocess_msg->client);
> > +
> > + free(subprocess_msg);
> > + pthread_mutex_lock(&subprocess_msg_lock);
> > + }
> > + }
> > +
> > + return NULL;
> > +}
> > +
> > void *network_thread (void *data)
> > {
> > struct installer *instp = (struct installer *)data;
> > @@ -249,17 +367,18 @@ void *network_thread (void *data)
> > int nread;
> > struct msg_elem *notification;
> > int ret;
> > - int pipe;
> > - fd_set pipefds;
> > - struct timeval tv;
> > update_state_t value;
> > + struct subprocess_msg_elem *subprocess_msg;
> > 
> > if (!instp) {
> > TRACE("Fatal error: Network thread aborting...");
> > return (void *)0;
> > }
> > 
> > + subprocess_ipc_handler_thread_id = start_thread(subprocess_thread, 
> NULL);
> > +
> > SIMPLEQ_INIT(&notifymsgs);
> > + SIMPLEQ_INIT(&subprocess_messages);
> > register_notifier(network_notifier);
> > 
> > /* Initialize and bind to UDS */
> > @@ -287,7 +406,8 @@ void *network_thread (void *data)
> > nread = read(ctrlconnfd, (void *)&msg, sizeof(msg));
> > 
> > if (nread != sizeof(msg)) {
> > - TRACE("IPC message too short: fragmentation not supported");
> > + TRACE("IPC message too short: fragmentation not supported (read %d 
> bytes)",
> > + nread);
> > close(ctrlconnfd);
> > continue;
> > }
> > @@ -309,72 +429,22 @@ void *network_thread (void *data)
> > }
> > break;
> > case SWUPDATE_SUBPROCESS:
> > - /*
> > - * this request is not for the installer,
> > - * but for one of the subprocesses
> > - * forward the request without checking
> > - * the payload
> > - */
> > -
> > - pipe = pctl_getfd_from_type(msg.data.procmsg.source);
> > - if (pipe < 0) {
> > - ERROR("Cannot find channel for requested process");
> > - msg.type = NACK;
> > - break;
> > - }
> > - TRACE("Received Message for %s",
> > - pctl_getname_from_type(msg.data.procmsg.source));
> > - if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
> > - ERROR("Pipe not available or closed: %d", pipe);
> > + subprocess_msg = (struct subprocess_msg_elem*)malloc(
> > + sizeof(struct subprocess_msg_elem));
> > + if (subprocess_msg == NULL) {
> > + ERROR("Cannot handle subprocess IPC because of OOM.");
> > msg.type = NACK;
> > break;
> > }
> > 
> > - /*
> > - * Cleanup the queue to be sure there are not
> > - * outstanding messages
> > - */
> > - empty_pipe(pipe);
> > -
> > - ret = write(pipe, &msg, sizeof(msg));
> > - if (ret != sizeof(msg)) {
> > - ERROR("Writing to pipe failed !");
> > - msg.type = NACK;
> > - break;
> > - }
> > -
> > - /*
> > - * Do not block forever for an answer
> > - * This would block the whole thread
> > - * If a message requires more time,
> > - * the destination process should sent an
> > - * answer back explaining this in the payload
> > - */
> > - FD_ZERO(&pipefds);
> > - FD_SET(pipe, &pipefds);
> > - tv.tv_usec = 0;
> > - if (!msg.data.procmsg.timeout)
> > - tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
> > - else
> > - tv.tv_sec = msg.data.procmsg.timeout;
> > - ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
> > -
> > - /*
> > - * If there is an error or timeout,
> > - * send a NACK back
> > - */
> > - if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
> > - msg.type = NACK;
> > - break;
> > - }
> > -
> > - ret = read(pipe, &msg, sizeof(msg));
> > - if (ret != sizeof(msg)) {
> > - ERROR("Reading from pipe failed !");
> > - msg.type = NACK;
> > - break;
> > - }
> > + subprocess_msg->client = ctrlconnfd;
> > + subprocess_msg->message = msg;
> > 
> > + pthread_mutex_lock(&subprocess_msg_lock);
> > + ++subprocess_messages_count;
> > + SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
> > + pthread_cond_signal(&subprocess_wkup);
> > + pthread_mutex_unlock(&subprocess_msg_lock);
> > /*
> > * ACK/NACK was inserted by the called SUBPROCESS
> > * It should not be touched here
> > @@ -453,12 +523,15 @@ void *network_thread (void *data)
> > msg.type = NACK;
> > sprintf(msg.data.msg, "Wrong request: aborting");
> > }
> > - ret = write(ctrlconnfd, &msg, sizeof(msg));
> > - if (ret < 0)
> > - printf("Error write on socket ctrl");
> > 
> > - if (msg.type != ACK)
> > - close(ctrlconnfd);
> > + if (msg.type == ACK || msg.type == NACK || msg.type == GET_STATUS) {
> > + ret = write(ctrlconnfd, &msg, sizeof(msg));
> > + if (ret < 0)
> > + printf("Error write on socket ctrl");
> > +
> > + if (msg.type != ACK)
> > + close(ctrlconnfd);
> > + }
> > pthread_mutex_unlock(&stream_mutex);
> > } while (1);
> > return (void *)0;
> > --
> > 2.26.2
> > 
>
> Best regards,
> Stefano Babic
>
> -- 
> =====================================================================
> DENX Software Engineering GmbH, Managing Director: Wolfgang Denk
> HRB 165235 Munich, Office: Kirchenstr.5, D-82194 Groebenzell, Germany
> Phone: +49-8142-66989-53 <+49%208142%206698953> Fax: +49-8142-66989-80 
> <+49%208142%206698980> Email: sba...@denx.de
> =====================================================================
>
Stefano Babic Dec. 21, 2020, 9:03 a.m. UTC | #3
Hi Sava,

On 20.12.20 19:44, Sava Jakovljev wrote:
> Hello Stefano,
> 
> Thank you for your review and your comments.
> 
> Regarding printf's, I just copied the code from networking thread, where
> printf is also used - but you're absolutely right, I will change
> printf's to INFO/WARN/ERROR in every occurrence. 
> 
> Regarding the possible race condition, in order to ensure that
> everything is started up correctly, I can make the following changes:
> 1. call SIMPLEQ_INIT(&subprocess_messages) before creating the thread
> (very stupid mistake from my part, I'm sorry). This is in the current
> implementation not a problem since pthread_cond_wait is called first.
> Small possibility of error exists in case of spurious wakeup, and this
> definitely should be avoided, thus initializing the queue first is
> absolutely mandatory.

Ok

> 2. modify subprocess thread to be a bit more robust in the following way:
> pthread_mutex_lock(&subprocess_msg_lock);
> 
> while(1) {
> while(!SIMPLEQ_EMPTY(&subprocess_messages)) {
> struct subprocess_msg_elem *subprocess_msg;
> subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
> SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);
> 
> pthread_mutex_unlock(&subprocess_msg_lock);
> 
> handle_subprocess_ipc(subprocess_msg);
> send_subprocess_reply(subprocess_msg);
> close(subprocess_msg->client);
> 
> free(subprocess_msg);
> pthread_mutex_lock(&subprocess_msg_lock);
> }
> 
> pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
> }

Why is lockind and unlocking inside the loop checking the messages ?
There is a different behavior depending on if there are messages in the
queue.

Is it the following not straightforward ?

thread() {

  while (1) {
	pthread_mutex_lock(&subprocess_msg_lock);
	pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
	while(!SIMPLEQ_EMPTY(&subprocess_messages)) {
		.....

	}
	pthread_mutex_unlock(&subprocess_msg_lock);
}


> 
> This will ensure that calling pthread_cond_wait will always be done in a
> correct way, even during start-up. This way, predicate is ensuring that
> everything handled correctly - does this approach look better for you?
> Also, I noticed the existence of SIMPLEQ_EMPTY helper, thus there's no
> need for subprocess_messages_count variable.

Yes, it is better.
 
> In case of normal start-up, pthread_cond_wait will unlock the mutex and
> data race scenarios are avoided.

Best regards,
Stefano Babic

> 
> Thank you.
> Best regards,
> Sava Jakovljev
> Stefano Babic schrieb am Sonntag, 20. Dezember 2020 um 12:23:53 UTC+1:
> 
>     Hi Sava,
> 
>     I have a few points to be clarified:
> 
>     On 13.12.20 03:21, Sava Jakovljev wrote:
>     > Signed-off-by: Sava Jakovljev <sava.ja...@teufel.de>
>     > ---
>     > Subprocesses may want to alter global SWUpdate state.
>     > In order to do this, subprocess RPC must be handled with network
>     thread
>     > being kept free, in order to avoid deadlocks. One currently exists in
>     > Suricatta activation IPC.
>     >
>     > Changes from v1 version of this patch are to correctly handle OOM
>     situation
>     > by sending a NACK message back, and to declare subprocess thread
>     functions
>     > static.
>     >
>     > core/network_thread.c | 217
>     ++++++++++++++++++++++++++++--------------
>     > 1 file changed, 145 insertions(+), 72 deletions(-)
>     >
>     > diff --git a/core/network_thread.c b/core/network_thread.c
>     > index 0e6080d..64be58f 100644
>     > --- a/core/network_thread.c
>     > +++ b/core/network_thread.c
>     > @@ -56,6 +56,20 @@ static unsigned long nrmsgs = 0;
>     >
>     > static pthread_mutex_t msglock = PTHREAD_MUTEX_INITIALIZER;
>     >
>     > +struct subprocess_msg_elem {
>     > + ipc_message message;
>     > + int client;
>     > + SIMPLEQ_ENTRY(subprocess_msg_elem) next;
>     > +};
>     > +
>     > +SIMPLEQ_HEAD(subprocess_msglist, subprocess_msg_elem);
>     > +static struct subprocess_msglist subprocess_messages;
>     > +
>     > +static pthread_t subprocess_ipc_handler_thread_id;
>     > +static pthread_mutex_t subprocess_msg_lock =
>     PTHREAD_MUTEX_INITIALIZER;
>     > +static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER;
>     > +static unsigned long subprocess_messages_count = 0;
>     > +
>     > static bool is_selection_allowed(const char *software_set, char
>     *running_mode,
>     > struct dict const *acceptedlist)
>     > {
>     > @@ -239,6 +253,110 @@ static void unlink_socket(void)
>     > unlink(get_ctrl_socket());
>     > }
>     >
>     > +static void send_subprocess_reply(struct subprocess_msg_elem
>     *subprocess_msg)
>     > +{
>     > + if (write(subprocess_msg->client, &subprocess_msg->message,
>     > + sizeof(subprocess_msg->message)) < 0)
>     > + printf("Error write on socket ctrl");
> 
>     printf() ?
> 
>     Everything flows into SWUpdate's notification via
>     TRACE/DEBUG/ERROR/...,
>     printf() should be avoided.
> 
>     > +}
>     > +
>     > +static void handle_subprocess_ipc(struct subprocess_msg_elem
>     *subprocess_msg)
>     > +{
>     > + ipc_message *msg = &subprocess_msg->message;
>     > + int pipe = pctl_getfd_from_type(msg->data.procmsg.source);
>     > + if (pipe < 0) {
>     > + ERROR("Cannot find channel for requested process");
>     > + msg->type = NACK;
>     > +
>     > + return;
>     > + }
>     > +
>     > + TRACE("Received Message for %s",
>     > + pctl_getname_from_type(msg->data.procmsg.source));
>     > + if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
>     > + ERROR("Pipe not available or closed: %d", pipe);
>     > + msg->type = NACK;
>     > +
>     > + return;
>     > + }
>     > +
>     > + /*
>     > + * Cleanup the queue to be sure there are not
>     > + * outstanding messages
>     > + */
>     > + empty_pipe(pipe);
>     > +
>     > + int ret = write(pipe, msg, sizeof(*msg));
>     > + if (ret != sizeof(*msg)) {
>     > + ERROR("Writing to pipe failed !");
>     > + msg->type = NACK;
>     > +
>     > + return;
>     > + }
>     > +
>     > + /*
>     > + * Do not block forever for an answer
>     > + * This would block the whole thread
>     > + * If a message requires more time,
>     > + * the destination process should sent an
>     > + * answer back explaining this in the payload
>     > + */
>     > + fd_set pipefds;
>     > + FD_ZERO(&pipefds);
>     > + FD_SET(pipe, &pipefds);
>     > +
>     > + struct timeval tv;
>     > + tv.tv_usec = 0;
>     > + if (!msg->data.procmsg.timeout)
>     > + tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
>     > + else
>     > + tv.tv_sec = msg->data.procmsg.timeout;
>     > + ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
>     > +
>     > + /*
>     > + * If there is an error or timeout,
>     > + * send a NACK back
>     > + */
>     > + if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
>     > + msg->type = NACK;
>     > +
>     > + return;
>     > + }
>     > +
>     > + ret = read(pipe, msg, sizeof(*msg));
>     > + if (ret != sizeof(*msg)) {
>     > + ERROR("Reading from pipe failed !");
>     > + msg->type = NACK;
>     > + }
>     > +}
> 
>     This code is just moved into a separate function (this makes the code
>     easier to understand, too). I guess there is no difference.
> 
>     > +
>     > +static void *subprocess_thread (void *data)
>     > +{
>     > + (void)data;
>     > + pthread_mutex_lock(&subprocess_msg_lock);
>     > +
> 
>     I am not sure about the synchronization, and if there is not a race at
>     startup.
> 
>     This thread is started and gets the mutex. The parent thread is going
>     on. If this thread has started in between, it locks the mutex.
> 
>     Later, the parent thread has:
> 
>     +pthread_mutex_lock(&subprocess_msg_lock);
>     + ++subprocess_messages_count;
>     +SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
>     + pthread_cond_signal(&subprocess_wkup);
>     +pthread_mutex_unlock(&subprocess_msg_lock);
> 
>     but if handle_subprocess_ipc() was scheduled once, it gets the mutex
>     and it is waiting on the conditional variable. When the parent is
>     trying
>     to get the mutex, it is blocked and it cannot release the conditional
>     variable. Am I wrong ?
> 
>     > + while(1) {
>     > + pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
>     > +
>     > + while(subprocess_messages_count) {
>     > + struct subprocess_msg_elem *subprocess_msg;
>     > + subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
>     > + SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);
>     > + --subprocess_messages_count;
>     > + pthread_mutex_unlock(&subprocess_msg_lock);
>     > +
>     > + handle_subprocess_ipc(subprocess_msg);
>     > + send_subprocess_reply(subprocess_msg);
>     > + close(subprocess_msg->client);
>     > +
>     > + free(subprocess_msg);
>     > + pthread_mutex_lock(&subprocess_msg_lock);
>     > + }
>     > + }
>     > +
>     > + return NULL;
>     > +}
>     > +
>     > void *network_thread (void *data)
>     > {
>     > struct installer *instp = (struct installer *)data;
>     > @@ -249,17 +367,18 @@ void *network_thread (void *data)
>     > int nread;
>     > struct msg_elem *notification;
>     > int ret;
>     > - int pipe;
>     > - fd_set pipefds;
>     > - struct timeval tv;
>     > update_state_t value;
>     > + struct subprocess_msg_elem *subprocess_msg;
>     >
>     > if (!instp) {
>     > TRACE("Fatal error: Network thread aborting...");
>     > return (void *)0;
>     > }
>     >
>     > + subprocess_ipc_handler_thread_id =
>     start_thread(subprocess_thread, NULL);
>     > +
>     > SIMPLEQ_INIT(&notifymsgs);
>     > + SIMPLEQ_INIT(&subprocess_messages);
>     > register_notifier(network_notifier);
>     >
>     > /* Initialize and bind to UDS */
>     > @@ -287,7 +406,8 @@ void *network_thread (void *data)
>     > nread = read(ctrlconnfd, (void *)&msg, sizeof(msg));
>     >
>     > if (nread != sizeof(msg)) {
>     > - TRACE("IPC message too short: fragmentation not supported");
>     > + TRACE("IPC message too short: fragmentation not supported (read
>     %d bytes)",
>     > + nread);
>     > close(ctrlconnfd);
>     > continue;
>     > }
>     > @@ -309,72 +429,22 @@ void *network_thread (void *data)
>     > }
>     > break;
>     > case SWUPDATE_SUBPROCESS:
>     > - /*
>     > - * this request is not for the installer,
>     > - * but for one of the subprocesses
>     > - * forward the request without checking
>     > - * the payload
>     > - */
>     > -
>     > - pipe = pctl_getfd_from_type(msg.data.procmsg.source);
>     > - if (pipe < 0) {
>     > - ERROR("Cannot find channel for requested process");
>     > - msg.type = NACK;
>     > - break;
>     > - }
>     > - TRACE("Received Message for %s",
>     > - pctl_getname_from_type(msg.data.procmsg.source));
>     > - if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
>     > - ERROR("Pipe not available or closed: %d", pipe);
>     > + subprocess_msg = (struct subprocess_msg_elem*)malloc(
>     > + sizeof(struct subprocess_msg_elem));
>     > + if (subprocess_msg == NULL) {
>     > + ERROR("Cannot handle subprocess IPC because of OOM.");
>     > msg.type = NACK;
>     > break;
>     > }
>     >
>     > - /*
>     > - * Cleanup the queue to be sure there are not
>     > - * outstanding messages
>     > - */
>     > - empty_pipe(pipe);
>     > -
>     > - ret = write(pipe, &msg, sizeof(msg));
>     > - if (ret != sizeof(msg)) {
>     > - ERROR("Writing to pipe failed !");
>     > - msg.type = NACK;
>     > - break;
>     > - }
>     > -
>     > - /*
>     > - * Do not block forever for an answer
>     > - * This would block the whole thread
>     > - * If a message requires more time,
>     > - * the destination process should sent an
>     > - * answer back explaining this in the payload
>     > - */
>     > - FD_ZERO(&pipefds);
>     > - FD_SET(pipe, &pipefds);
>     > - tv.tv_usec = 0;
>     > - if (!msg.data.procmsg.timeout)
>     > - tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
>     > - else
>     > - tv.tv_sec = msg.data.procmsg.timeout;
>     > - ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
>     > -
>     > - /*
>     > - * If there is an error or timeout,
>     > - * send a NACK back
>     > - */
>     > - if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
>     > - msg.type = NACK;
>     > - break;
>     > - }
>     > -
>     > - ret = read(pipe, &msg, sizeof(msg));
>     > - if (ret != sizeof(msg)) {
>     > - ERROR("Reading from pipe failed !");
>     > - msg.type = NACK;
>     > - break;
>     > - }
>     > + subprocess_msg->client = ctrlconnfd;
>     > + subprocess_msg->message = msg;
>     >
>     > + pthread_mutex_lock(&subprocess_msg_lock);
>     > + ++subprocess_messages_count;
>     > + SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
>     > + pthread_cond_signal(&subprocess_wkup);
>     > + pthread_mutex_unlock(&subprocess_msg_lock);
>     > /*
>     > * ACK/NACK was inserted by the called SUBPROCESS
>     > * It should not be touched here
>     > @@ -453,12 +523,15 @@ void *network_thread (void *data)
>     > msg.type = NACK;
>     > sprintf(msg.data.msg, "Wrong request: aborting");
>     > }
>     > - ret = write(ctrlconnfd, &msg, sizeof(msg));
>     > - if (ret < 0)
>     > - printf("Error write on socket ctrl");
>     >
>     > - if (msg.type != ACK)
>     > - close(ctrlconnfd);
>     > + if (msg.type == ACK || msg.type == NACK || msg.type ==
>     GET_STATUS) {
>     > + ret = write(ctrlconnfd, &msg, sizeof(msg));
>     > + if (ret < 0)
>     > + printf("Error write on socket ctrl");
>     > +
>     > + if (msg.type != ACK)
>     > + close(ctrlconnfd);
>     > + }
>     > pthread_mutex_unlock(&stream_mutex);
>     > } while (1);
>     > return (void *)0;
>     > --
>     > 2.26.2
>     >
> 
>     Best regards,
>     Stefano Babic
> 
>     -- 
>     =====================================================================
>     DENX Software Engineering GmbH, Managing Director: Wolfgang Denk
>     HRB 165235 Munich, Office: Kirchenstr.5, D-82194 Groebenzell, Germany
>     Phone: +49-8142-66989-53 <tel:+49%208142%206698953> Fax:
>     +49-8142-66989-80 <tel:+49%208142%206698980> Email: sba...@denx.de
>     =====================================================================
> 
> -- 
> You received this message because you are subscribed to the Google
> Groups "swupdate" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to swupdate+unsubscribe@googlegroups.com
> <mailto:swupdate+unsubscribe@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/swupdate/1d56954e-a8b1-4ce4-b642-e6c39ea2ef4an%40googlegroups.com
> <https://groups.google.com/d/msgid/swupdate/1d56954e-a8b1-4ce4-b642-e6c39ea2ef4an%40googlegroups.com?utm_medium=email&utm_source=footer>.
Sava Jakovljev Dec. 21, 2020, 9:48 a.m. UTC | #4
Hello Stefano,

Mutex must be unlocked while waiting for the reply from the subprocess. 
Otherwise, a subtle error is made that can lead to a deadlock:
1. networking thread passes the request to subprocess thread.
2. Subprocess thread invokes the subprocess and keeps the mutex locked.
3. In the meantime, another subprocess request is received by the 
networking thread, but it cannot lock the mutex, so it waits.
4. First subprocess request may send network IPC which then will be stuck, 
while subprocess thread waits for the reply, which will never come.

Thank you,
Sava Jakovljev

On Monday, December 21, 2020 at 10:04:04 AM UTC+1 Stefano Babic wrote:

> Hi Sava,
>
> On 20.12.20 19:44, Sava Jakovljev wrote:
> > Hello Stefano,
> > 
> > Thank you for your review and your comments.
> > 
> > Regarding printf's, I just copied the code from networking thread, where
> > printf is also used - but you're absolutely right, I will change
> > printf's to INFO/WARN/ERROR in every occurrence. 
> > 
> > Regarding the possible race condition, in order to ensure that
> > everything is started up correctly, I can make the following changes:
> > 1. call SIMPLEQ_INIT(&subprocess_messages) before creating the thread
> > (very stupid mistake from my part, I'm sorry). This is in the current
> > implementation not a problem since pthread_cond_wait is called first.
> > Small possibility of error exists in case of spurious wakeup, and this
> > definitely should be avoided, thus initializing the queue first is
> > absolutely mandatory.
>
> Ok
>
> > 2. modify subprocess thread to be a bit more robust in the following way:
> > pthread_mutex_lock(&subprocess_msg_lock);
> > 
> > while(1) {
> > while(!SIMPLEQ_EMPTY(&subprocess_messages)) {
> > struct subprocess_msg_elem *subprocess_msg;
> > subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
> > SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);
> > 
> > pthread_mutex_unlock(&subprocess_msg_lock);
> > 
> > handle_subprocess_ipc(subprocess_msg);
> > send_subprocess_reply(subprocess_msg);
> > close(subprocess_msg->client);
> > 
> > free(subprocess_msg);
> > pthread_mutex_lock(&subprocess_msg_lock);
> > }
> > 
> > pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
> > }
>
> Why is lockind and unlocking inside the loop checking the messages ?
> There is a different behavior depending on if there are messages in the
> queue.
>
> Is it the following not straightforward ?
>
> thread() {
>
> while (1) {
> pthread_mutex_lock(&subprocess_msg_lock);
> pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
> while(!SIMPLEQ_EMPTY(&subprocess_messages)) {
> .....
>
> }
> pthread_mutex_unlock(&subprocess_msg_lock);
> }
>
>
> > 
> > This will ensure that calling pthread_cond_wait will always be done in a
> > correct way, even during start-up. This way, predicate is ensuring that
> > everything handled correctly - does this approach look better for you?
> > Also, I noticed the existence of SIMPLEQ_EMPTY helper, thus there's no
> > need for subprocess_messages_count variable.
>
> Yes, it is better.
>  
> > In case of normal start-up, pthread_cond_wait will unlock the mutex and
> > data race scenarios are avoided.
>
> Best regards,
> Stefano Babic
>
> > 
> > Thank you.
> > Best regards,
> > Sava Jakovljev
> > Stefano Babic schrieb am Sonntag, 20. Dezember 2020 um 12:23:53 UTC+1:
> > 
> > Hi Sava,
> > 
> > I have a few points to be clarified:
> > 
> > On 13.12.20 03:21, Sava Jakovljev wrote:
> > > Signed-off-by: Sava Jakovljev <sava.ja...@teufel.de>
> > > ---
> > > Subprocesses may want to alter global SWUpdate state.
> > > In order to do this, subprocess RPC must be handled with network
> > thread
> > > being kept free, in order to avoid deadlocks. One currently exists in
> > > Suricatta activation IPC.
> > >
> > > Changes from v1 version of this patch are to correctly handle OOM
> > situation
> > > by sending a NACK message back, and to declare subprocess thread
> > functions
> > > static.
> > >
> > > core/network_thread.c | 217
> > ++++++++++++++++++++++++++++--------------
> > > 1 file changed, 145 insertions(+), 72 deletions(-)
> > >
> > > diff --git a/core/network_thread.c b/core/network_thread.c
> > > index 0e6080d..64be58f 100644
> > > --- a/core/network_thread.c
> > > +++ b/core/network_thread.c
> > > @@ -56,6 +56,20 @@ static unsigned long nrmsgs = 0;
> > >
> > > static pthread_mutex_t msglock = PTHREAD_MUTEX_INITIALIZER;
> > >
> > > +struct subprocess_msg_elem {
> > > + ipc_message message;
> > > + int client;
> > > + SIMPLEQ_ENTRY(subprocess_msg_elem) next;
> > > +};
> > > +
> > > +SIMPLEQ_HEAD(subprocess_msglist, subprocess_msg_elem);
> > > +static struct subprocess_msglist subprocess_messages;
> > > +
> > > +static pthread_t subprocess_ipc_handler_thread_id;
> > > +static pthread_mutex_t subprocess_msg_lock =
> > PTHREAD_MUTEX_INITIALIZER;
> > > +static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER;
> > > +static unsigned long subprocess_messages_count = 0;
> > > +
> > > static bool is_selection_allowed(const char *software_set, char
> > *running_mode,
> > > struct dict const *acceptedlist)
> > > {
> > > @@ -239,6 +253,110 @@ static void unlink_socket(void)
> > > unlink(get_ctrl_socket());
> > > }
> > >
> > > +static void send_subprocess_reply(struct subprocess_msg_elem
> > *subprocess_msg)
> > > +{
> > > + if (write(subprocess_msg->client, &subprocess_msg->message,
> > > + sizeof(subprocess_msg->message)) < 0)
> > > + printf("Error write on socket ctrl");
> > 
> > printf() ?
> > 
> > Everything flows into SWUpdate's notification via
> > TRACE/DEBUG/ERROR/...,
> > printf() should be avoided.
> > 
> > > +}
> > > +
> > > +static void handle_subprocess_ipc(struct subprocess_msg_elem
> > *subprocess_msg)
> > > +{
> > > + ipc_message *msg = &subprocess_msg->message;
> > > + int pipe = pctl_getfd_from_type(msg->data.procmsg.source);
> > > + if (pipe < 0) {
> > > + ERROR("Cannot find channel for requested process");
> > > + msg->type = NACK;
> > > +
> > > + return;
> > > + }
> > > +
> > > + TRACE("Received Message for %s",
> > > + pctl_getname_from_type(msg->data.procmsg.source));
> > > + if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
> > > + ERROR("Pipe not available or closed: %d", pipe);
> > > + msg->type = NACK;
> > > +
> > > + return;
> > > + }
> > > +
> > > + /*
> > > + * Cleanup the queue to be sure there are not
> > > + * outstanding messages
> > > + */
> > > + empty_pipe(pipe);
> > > +
> > > + int ret = write(pipe, msg, sizeof(*msg));
> > > + if (ret != sizeof(*msg)) {
> > > + ERROR("Writing to pipe failed !");
> > > + msg->type = NACK;
> > > +
> > > + return;
> > > + }
> > > +
> > > + /*
> > > + * Do not block forever for an answer
> > > + * This would block the whole thread
> > > + * If a message requires more time,
> > > + * the destination process should sent an
> > > + * answer back explaining this in the payload
> > > + */
> > > + fd_set pipefds;
> > > + FD_ZERO(&pipefds);
> > > + FD_SET(pipe, &pipefds);
> > > +
> > > + struct timeval tv;
> > > + tv.tv_usec = 0;
> > > + if (!msg->data.procmsg.timeout)
> > > + tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
> > > + else
> > > + tv.tv_sec = msg->data.procmsg.timeout;
> > > + ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
> > > +
> > > + /*
> > > + * If there is an error or timeout,
> > > + * send a NACK back
> > > + */
> > > + if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
> > > + msg->type = NACK;
> > > +
> > > + return;
> > > + }
> > > +
> > > + ret = read(pipe, msg, sizeof(*msg));
> > > + if (ret != sizeof(*msg)) {
> > > + ERROR("Reading from pipe failed !");
> > > + msg->type = NACK;
> > > + }
> > > +}
> > 
> > This code is just moved into a separate function (this makes the code
> > easier to understand, too). I guess there is no difference.
> > 
> > > +
> > > +static void *subprocess_thread (void *data)
> > > +{
> > > + (void)data;
> > > + pthread_mutex_lock(&subprocess_msg_lock);
> > > +
> > 
> > I am not sure about the synchronization, and if there is not a race at
> > startup.
> > 
> > This thread is started and gets the mutex. The parent thread is going
> > on. If this thread has started in between, it locks the mutex.
> > 
> > Later, the parent thread has:
> > 
> > +pthread_mutex_lock(&subprocess_msg_lock);
> > + ++subprocess_messages_count;
> > +SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
> > + pthread_cond_signal(&subprocess_wkup);
> > +pthread_mutex_unlock(&subprocess_msg_lock);
> > 
> > but if handle_subprocess_ipc() was scheduled once, it gets the mutex
> > and it is waiting on the conditional variable. When the parent is
> > trying
> > to get the mutex, it is blocked and it cannot release the conditional
> > variable. Am I wrong ?
> > 
> > > + while(1) {
> > > + pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
> > > +
> > > + while(subprocess_messages_count) {
> > > + struct subprocess_msg_elem *subprocess_msg;
> > > + subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
> > > + SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);
> > > + --subprocess_messages_count;
> > > + pthread_mutex_unlock(&subprocess_msg_lock);
> > > +
> > > + handle_subprocess_ipc(subprocess_msg);
> > > + send_subprocess_reply(subprocess_msg);
> > > + close(subprocess_msg->client);
> > > +
> > > + free(subprocess_msg);
> > > + pthread_mutex_lock(&subprocess_msg_lock);
> > > + }
> > > + }
> > > +
> > > + return NULL;
> > > +}
> > > +
> > > void *network_thread (void *data)
> > > {
> > > struct installer *instp = (struct installer *)data;
> > > @@ -249,17 +367,18 @@ void *network_thread (void *data)
> > > int nread;
> > > struct msg_elem *notification;
> > > int ret;
> > > - int pipe;
> > > - fd_set pipefds;
> > > - struct timeval tv;
> > > update_state_t value;
> > > + struct subprocess_msg_elem *subprocess_msg;
> > >
> > > if (!instp) {
> > > TRACE("Fatal error: Network thread aborting...");
> > > return (void *)0;
> > > }
> > >
> > > + subprocess_ipc_handler_thread_id =
> > start_thread(subprocess_thread, NULL);
> > > +
> > > SIMPLEQ_INIT(&notifymsgs);
> > > + SIMPLEQ_INIT(&subprocess_messages);
> > > register_notifier(network_notifier);
> > >
> > > /* Initialize and bind to UDS */
> > > @@ -287,7 +406,8 @@ void *network_thread (void *data)
> > > nread = read(ctrlconnfd, (void *)&msg, sizeof(msg));
> > >
> > > if (nread != sizeof(msg)) {
> > > - TRACE("IPC message too short: fragmentation not supported");
> > > + TRACE("IPC message too short: fragmentation not supported (read
> > %d bytes)",
> > > + nread);
> > > close(ctrlconnfd);
> > > continue;
> > > }
> > > @@ -309,72 +429,22 @@ void *network_thread (void *data)
> > > }
> > > break;
> > > case SWUPDATE_SUBPROCESS:
> > > - /*
> > > - * this request is not for the installer,
> > > - * but for one of the subprocesses
> > > - * forward the request without checking
> > > - * the payload
> > > - */
> > > -
> > > - pipe = pctl_getfd_from_type(msg.data.procmsg.source);
> > > - if (pipe < 0) {
> > > - ERROR("Cannot find channel for requested process");
> > > - msg.type = NACK;
> > > - break;
> > > - }
> > > - TRACE("Received Message for %s",
> > > - pctl_getname_from_type(msg.data.procmsg.source));
> > > - if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
> > > - ERROR("Pipe not available or closed: %d", pipe);
> > > + subprocess_msg = (struct subprocess_msg_elem*)malloc(
> > > + sizeof(struct subprocess_msg_elem));
> > > + if (subprocess_msg == NULL) {
> > > + ERROR("Cannot handle subprocess IPC because of OOM.");
> > > msg.type = NACK;
> > > break;
> > > }
> > >
> > > - /*
> > > - * Cleanup the queue to be sure there are not
> > > - * outstanding messages
> > > - */
> > > - empty_pipe(pipe);
> > > -
> > > - ret = write(pipe, &msg, sizeof(msg));
> > > - if (ret != sizeof(msg)) {
> > > - ERROR("Writing to pipe failed !");
> > > - msg.type = NACK;
> > > - break;
> > > - }
> > > -
> > > - /*
> > > - * Do not block forever for an answer
> > > - * This would block the whole thread
> > > - * If a message requires more time,
> > > - * the destination process should sent an
> > > - * answer back explaining this in the payload
> > > - */
> > > - FD_ZERO(&pipefds);
> > > - FD_SET(pipe, &pipefds);
> > > - tv.tv_usec = 0;
> > > - if (!msg.data.procmsg.timeout)
> > > - tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
> > > - else
> > > - tv.tv_sec = msg.data.procmsg.timeout;
> > > - ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
> > > -
> > > - /*
> > > - * If there is an error or timeout,
> > > - * send a NACK back
> > > - */
> > > - if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
> > > - msg.type = NACK;
> > > - break;
> > > - }
> > > -
> > > - ret = read(pipe, &msg, sizeof(msg));
> > > - if (ret != sizeof(msg)) {
> > > - ERROR("Reading from pipe failed !");
> > > - msg.type = NACK;
> > > - break;
> > > - }
> > > + subprocess_msg->client = ctrlconnfd;
> > > + subprocess_msg->message = msg;
> > >
> > > + pthread_mutex_lock(&subprocess_msg_lock);
> > > + ++subprocess_messages_count;
> > > + SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
> > > + pthread_cond_signal(&subprocess_wkup);
> > > + pthread_mutex_unlock(&subprocess_msg_lock);
> > > /*
> > > * ACK/NACK was inserted by the called SUBPROCESS
> > > * It should not be touched here
> > > @@ -453,12 +523,15 @@ void *network_thread (void *data)
> > > msg.type = NACK;
> > > sprintf(msg.data.msg, "Wrong request: aborting");
> > > }
> > > - ret = write(ctrlconnfd, &msg, sizeof(msg));
> > > - if (ret < 0)
> > > - printf("Error write on socket ctrl");
> > >
> > > - if (msg.type != ACK)
> > > - close(ctrlconnfd);
> > > + if (msg.type == ACK || msg.type == NACK || msg.type ==
> > GET_STATUS) {
> > > + ret = write(ctrlconnfd, &msg, sizeof(msg));
> > > + if (ret < 0)
> > > + printf("Error write on socket ctrl");
> > > +
> > > + if (msg.type != ACK)
> > > + close(ctrlconnfd);
> > > + }
> > > pthread_mutex_unlock(&stream_mutex);
> > > } while (1);
> > > return (void *)0;
> > > --
> > > 2.26.2
> > >
> > 
> > Best regards,
> > Stefano Babic
> > 
> > -- 
> > =====================================================================
> > DENX Software Engineering GmbH, Managing Director: Wolfgang Denk
> > HRB 165235 Munich, Office: Kirchenstr.5, D-82194 Groebenzell, Germany
> > Phone: +49-8142-66989-53 <+49%208142%206698953> 
> <tel:+49%208142%206698953> Fax:
> > +49-8142-66989-80 <+49%208142%206698980> <tel:+49%208142%206698980> 
> Email: sba...@denx.de
> > =====================================================================
> > 
> > -- 
> > You received this message because you are subscribed to the Google
> > Groups "swupdate" group.
> > To unsubscribe from this group and stop receiving emails from it, send
> > an email to swupdate+u...@googlegroups.com
> > <mailto:swupdate+u...@googlegroups.com>.
> > To view this discussion on the web visit
> > 
> https://groups.google.com/d/msgid/swupdate/1d56954e-a8b1-4ce4-b642-e6c39ea2ef4an%40googlegroups.com
> > <
> https://groups.google.com/d/msgid/swupdate/1d56954e-a8b1-4ce4-b642-e6c39ea2ef4an%40googlegroups.com?utm_medium=email&utm_source=footer
> >.
>
>
> -- 
> =====================================================================
> DENX Software Engineering GmbH, Managing Director: Wolfgang Denk
> HRB 165235 Munich, Office: Kirchenstr.5, D-82194 Groebenzell, Germany
> Phone: +49-8142-66989-53 <+49%208142%206698953> Fax: +49-8142-66989-80 
> <+49%208142%206698980> Email: sba...@denx.de
> =====================================================================
>
diff mbox series

Patch

diff --git a/core/network_thread.c b/core/network_thread.c
index 0e6080d..64be58f 100644
--- a/core/network_thread.c
+++ b/core/network_thread.c
@@ -56,6 +56,20 @@  static unsigned long nrmsgs = 0;

 static pthread_mutex_t msglock = PTHREAD_MUTEX_INITIALIZER;

+struct subprocess_msg_elem {
+	ipc_message message;
+	int client;
+	SIMPLEQ_ENTRY(subprocess_msg_elem) next;
+};
+
+SIMPLEQ_HEAD(subprocess_msglist, subprocess_msg_elem);
+static struct subprocess_msglist subprocess_messages;
+
+static pthread_t subprocess_ipc_handler_thread_id;
+static pthread_mutex_t subprocess_msg_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER;
+static unsigned long subprocess_messages_count = 0;
+
 static bool is_selection_allowed(const char *software_set, char *running_mode,
 				 struct dict const *acceptedlist)
 {
@@ -239,6 +253,110 @@  static void unlink_socket(void)
 	unlink(get_ctrl_socket());
 }

+static void send_subprocess_reply(struct subprocess_msg_elem *subprocess_msg)
+{
+	if (write(subprocess_msg->client, &subprocess_msg->message,
+			sizeof(subprocess_msg->message)) < 0)
+		printf("Error write on socket ctrl");
+}
+
+static void handle_subprocess_ipc(struct subprocess_msg_elem *subprocess_msg)
+{
+	ipc_message *msg = &subprocess_msg->message;
+	int pipe = pctl_getfd_from_type(msg->data.procmsg.source);
+	if (pipe < 0) {
+		ERROR("Cannot find channel for requested process");
+		msg->type = NACK;
+
+		return;
+	}
+
+	TRACE("Received Message for %s",
+		pctl_getname_from_type(msg->data.procmsg.source));
+	if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
+		ERROR("Pipe not available or closed: %d", pipe);
+		msg->type = NACK;
+
+		return;
+	}
+
+	/*
+	 * Cleanup the queue to be sure there are not
+	 * outstanding messages
+	 */
+	empty_pipe(pipe);
+
+	int ret = write(pipe, msg, sizeof(*msg));
+	if (ret != sizeof(*msg)) {
+		ERROR("Writing to pipe failed !");
+		msg->type = NACK;
+
+		return;
+	}
+
+	/*
+	 * Do not block forever for an answer
+	 * This would block the whole thread
+	 * If a message requires more time,
+	 * the destination process should sent an
+	 * answer back explaining this in the payload
+	 */
+	fd_set pipefds;
+	FD_ZERO(&pipefds);
+	FD_SET(pipe, &pipefds);
+
+	struct timeval tv;
+	tv.tv_usec = 0;
+	if (!msg->data.procmsg.timeout)
+		tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
+	else
+		tv.tv_sec = msg->data.procmsg.timeout;
+	ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
+
+	/*
+	 * If there is an error or timeout,
+	 * send a NACK back
+	 */
+	if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
+		msg->type = NACK;
+
+		return;
+	}
+
+	ret = read(pipe, msg, sizeof(*msg));
+	if (ret != sizeof(*msg)) {
+		ERROR("Reading from pipe failed !");
+		msg->type = NACK;
+	}
+}
+
+static void *subprocess_thread (void *data)
+{
+	(void)data;
+	pthread_mutex_lock(&subprocess_msg_lock);
+
+	while(1) {
+		pthread_cond_wait(&subprocess_wkup, &subprocess_msg_lock);
+
+		while(subprocess_messages_count) {
+			struct subprocess_msg_elem *subprocess_msg;
+			subprocess_msg = SIMPLEQ_FIRST(&subprocess_messages);
+			SIMPLEQ_REMOVE_HEAD(&subprocess_messages, next);
+			--subprocess_messages_count;
+			pthread_mutex_unlock(&subprocess_msg_lock);
+
+			handle_subprocess_ipc(subprocess_msg);
+			send_subprocess_reply(subprocess_msg);
+			close(subprocess_msg->client);
+
+			free(subprocess_msg);
+			pthread_mutex_lock(&subprocess_msg_lock);
+		}
+	}
+
+	return NULL;
+}
+
 void *network_thread (void *data)
 {
 	struct installer *instp = (struct installer *)data;
@@ -249,17 +367,18 @@  void *network_thread (void *data)
 	int nread;
 	struct msg_elem *notification;
 	int ret;
-	int pipe;
-	fd_set pipefds;
-	struct timeval tv;
 	update_state_t value;
+	struct subprocess_msg_elem *subprocess_msg;

 	if (!instp) {
 		TRACE("Fatal error: Network thread aborting...");
 		return (void *)0;
 	}

+	subprocess_ipc_handler_thread_id = start_thread(subprocess_thread, NULL);
+
 	SIMPLEQ_INIT(&notifymsgs);
+	SIMPLEQ_INIT(&subprocess_messages);
 	register_notifier(network_notifier);

 	/* Initialize and bind to UDS */
@@ -287,7 +406,8 @@  void *network_thread (void *data)
 		nread = read(ctrlconnfd, (void *)&msg, sizeof(msg));

 		if (nread != sizeof(msg)) {
-			TRACE("IPC message too short: fragmentation not supported");
+			TRACE("IPC message too short: fragmentation not supported (read %d bytes)",
+				nread);
 			close(ctrlconnfd);
 			continue;
 		}
@@ -309,72 +429,22 @@  void *network_thread (void *data)
 				}
 				break;
 			case SWUPDATE_SUBPROCESS:
-				/*
-				 *  this request is not for the installer,
-				 *  but for one of the subprocesses
-				 *  forward the request without checking
-				 *  the payload
-				 */
-
-				pipe = pctl_getfd_from_type(msg.data.procmsg.source);
-				if (pipe < 0) {
-					ERROR("Cannot find channel for requested process");
-					msg.type = NACK;
-					break;
-				}
-				TRACE("Received Message for %s",
-					pctl_getname_from_type(msg.data.procmsg.source));
-				if (fcntl(pipe, F_GETFL) < 0 && errno == EBADF) {
-					ERROR("Pipe not available or closed: %d", pipe);
+				subprocess_msg = (struct subprocess_msg_elem*)malloc(
+						sizeof(struct subprocess_msg_elem));
+				if (subprocess_msg == NULL) {
+					ERROR("Cannot handle subprocess IPC because of OOM.");
 					msg.type = NACK;
 					break;
 				}

-				/*
-				 * Cleanup the queue to be sure there are not
-				 * outstanding messages
-				 */
-				empty_pipe(pipe);
-
-				ret = write(pipe, &msg, sizeof(msg));
-				if (ret != sizeof(msg)) {
-					ERROR("Writing to pipe failed !");
-					msg.type = NACK;
-					break;
-				}
-
-				/*
-				 * Do not block forever for an answer
-				 * This would block the whole thread
-				 * If a message requires more time,
-				 * the destination process should sent an
-				 * answer back explaining this in the payload
-				 */
-				FD_ZERO(&pipefds);
-				FD_SET(pipe, &pipefds);
-				tv.tv_usec = 0;
-				if (!msg.data.procmsg.timeout)
-					tv.tv_sec = DEFAULT_INTERNAL_TIMEOUT;
-				else
-					tv.tv_sec = msg.data.procmsg.timeout;
-				ret = select(pipe + 1, &pipefds, NULL, NULL, &tv);
-
-				/*
-				 * If there is an error or timeout,
-				 * send a NACK back
-				 */
-				if (ret <= 0 || !FD_ISSET(pipe, &pipefds)) {
-					msg.type = NACK;
-					break;
-				}
-
-				ret = read(pipe, &msg, sizeof(msg));
-				if (ret != sizeof(msg)) {
-					ERROR("Reading from pipe failed !");
-					msg.type = NACK;
-					break;
-				}
+				subprocess_msg->client = ctrlconnfd;
+				subprocess_msg->message = msg;

+				pthread_mutex_lock(&subprocess_msg_lock);
+				++subprocess_messages_count;
+				SIMPLEQ_INSERT_TAIL(&subprocess_messages, subprocess_msg, next);
+				pthread_cond_signal(&subprocess_wkup);
+				pthread_mutex_unlock(&subprocess_msg_lock);
 				/*
 				 * ACK/NACK was inserted by the called SUBPROCESS
 				 * It should not be touched here
@@ -453,12 +523,15 @@  void *network_thread (void *data)
 			msg.type = NACK;
 			sprintf(msg.data.msg, "Wrong request: aborting");
 		}
-		ret = write(ctrlconnfd, &msg, sizeof(msg));
-		if (ret < 0)
-			printf("Error write on socket ctrl");

-		if (msg.type != ACK)
-			close(ctrlconnfd);
+		if (msg.type == ACK || msg.type == NACK || msg.type == GET_STATUS) {
+			ret = write(ctrlconnfd, &msg, sizeof(msg));
+			if (ret < 0)
+				printf("Error write on socket ctrl");
+
+			if (msg.type != ACK)
+				close(ctrlconnfd);
+		}
 		pthread_mutex_unlock(&stream_mutex);
 	} while (1);
 	return (void *)0;