diff mbox series

[v1,8/8] suricatta: separate ipc into separate thread

Message ID 20211015082457.6804-9-roland.gaudig-oss@weidmueller.com
State Changes Requested
Headers show
Series suricatta: ipc: add request to get hawkBit server status | expand

Commit Message

Roland Gaudig Oct. 15, 2021, 8:24 a.m. UTC
From: Roland Gaudig <roland.gaudig@weidmueller.com>

In case of problems with the network connection to the hawkBit server
some requests may take a very long time. During that time also calls to
the server IPC are not answered. Therefore, the server IPC functions are
moved into a separate thread.

Signed-off-by: Roland Gaudig <roland.gaudig@weidmueller.com>
---

 include/channel_op_res.h   |  3 +-
 suricatta/common.c         |  1 +
 suricatta/server_hawkbit.c | 32 +++++++++++-
 suricatta/suricatta.c      | 99 +++++++++++++++++++++++++++-----------
 4 files changed, 105 insertions(+), 30 deletions(-)
diff mbox series

Patch

diff --git a/include/channel_op_res.h b/include/channel_op_res.h
index 104ce32..3612305 100644
--- a/include/channel_op_res.h
+++ b/include/channel_op_res.h
@@ -25,5 +25,6 @@  typedef enum {
 	CHANNEL_ENOTFOUND,
 	CHANNEL_EREDIRECT,
 	CHANNEL_ESSLCERT,
-	CHANNEL_ESSLCONNECT
+	CHANNEL_ESSLCONNECT,
+	CHANNEL_REQUEST_PENDING,
 } channel_op_res_t;
diff --git a/suricatta/common.c b/suricatta/common.c
index 3a9ac60..28c5e94 100644
--- a/suricatta/common.c
+++ b/suricatta/common.c
@@ -58,6 +58,7 @@  server_op_res_t map_channel_retcode(channel_op_res_t response)
 	case CHANNEL_EAGAIN:
 	case CHANNEL_ESSLCERT:
 	case CHANNEL_ESSLCONNECT:
+	case CHANNEL_REQUEST_PENDING:
 		return SERVER_EAGAIN;
 	case CHANNEL_EACCES:
 		return SERVER_EACCES;
diff --git a/suricatta/server_hawkbit.c b/suricatta/server_hawkbit.c
index 6dd1f8c..f4de35e 100644
--- a/suricatta/server_hawkbit.c
+++ b/suricatta/server_hawkbit.c
@@ -58,6 +58,7 @@  static struct option long_options[] = {
 
 static unsigned short mandatory_argument_count = 0;
 static pthread_mutex_t notifylock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t ipc_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
 
 /*
  * See hawkBit's API for an explanation
@@ -521,15 +522,25 @@  server_op_res_t server_set_config_data(json_object *json_root)
 	tmp = json_get_data_url(json_root, "configData");
 
 	if (tmp != NULL) {
+		pthread_mutex_lock(&ipc_lock);
 		if (server_hawkbit.configData_url)
 			free(server_hawkbit.configData_url);
 		server_hawkbit.configData_url = tmp;
 		server_hawkbit.has_to_send_configData = (get_target_data_length() > 0) ? true : false;
 		TRACE("ConfigData: %s", server_hawkbit.configData_url);
+		pthread_mutex_unlock(&ipc_lock);
 	}
 	return SERVER_OK;
 }
 
+static void report_server_status(int server_status)
+{
+	pthread_mutex_lock(&ipc_lock);
+	server_hawkbit.server_status = server_status;
+	server_hawkbit.server_status_time = time(NULL);
+	pthread_mutex_unlock(&ipc_lock);
+}
+
 static server_op_res_t server_get_device_info(channel_t *channel, channel_data_t *channel_data)
 {
 	assert(channel != NULL);
@@ -551,8 +562,7 @@  static server_op_res_t server_get_device_info(channel_t *channel, channel_data_t
 
 	channel_op_res_t ch_response = channel->get(channel, (void *)channel_data);
 
-	server_hawkbit.server_status = ch_response;
-	server_hawkbit.server_status_time = time(NULL);
+	report_server_status(ch_response);
 	if ((result = map_channel_retcode(ch_response)) !=
 	    SERVER_OK) {
 		goto cleanup;
@@ -1466,12 +1476,14 @@  int get_target_data_length(void)
 	int len = 0;
 	struct dict_entry *entry;
 
+	pthread_mutex_lock(&ipc_lock);
 	LIST_FOREACH(entry, &server_hawkbit.configdata, next) {
 		char *key = dict_entry_get_key(entry);
 		char *value = dict_entry_get_value(entry);
 
 		len += strlen(key) + strlen(value) + strlen (" : ") + 6;
 	}
+	pthread_mutex_unlock(&ipc_lock);
 
 	return len;
 }
@@ -1506,6 +1518,7 @@  server_op_res_t server_send_target_data(void)
 	);
 
 	char *keyvalue = NULL;
+	pthread_mutex_lock(&ipc_lock);
 	LIST_FOREACH(entry, &server_hawkbit.configdata, next) {
 		char *key = dict_entry_get_key(entry);
 		char *value = dict_entry_get_value(entry);
@@ -1517,6 +1530,7 @@  server_op_res_t server_send_target_data(void)
 				value)) {
 			ERROR("hawkBit server reply cannot be sent because of OOM.");
 			result = SERVER_EINIT;
+			pthread_mutex_unlock(&ipc_lock);
 			goto cleanup;
 		}
 		first = false;
@@ -1525,6 +1539,7 @@  server_op_res_t server_send_target_data(void)
 		free(keyvalue);
 
 	}
+	pthread_mutex_unlock(&ipc_lock);
 
 	TRACE("CONFIGDATA=%s", configData);
 
@@ -1678,6 +1693,7 @@  server_op_res_t server_start(char *fname, int argc, char *argv[])
 
 	mandatory_argument_count = 0;
 
+	pthread_mutex_lock(&ipc_lock);
 	LIST_INIT(&server_hawkbit.configdata);
 	LIST_INIT(&server_hawkbit.httpheaders);
 
@@ -1702,6 +1718,7 @@  server_op_res_t server_start(char *fname, int argc, char *argv[])
 		}
 		swupdate_cfg_destroy(&handle);
 	}
+	pthread_mutex_unlock(&ipc_lock);
 
 	if (loglevel >= DEBUGLEVEL) {
 		server_hawkbit.debug = true;
@@ -1875,6 +1892,13 @@  server_op_res_t server_start(char *fname, int argc, char *argv[])
 	 */
 	server_hawkbit.has_to_send_configData = true;
 
+	/*
+	 * The following loop might block for a long time, if server does
+	 * not respond immediately. Therefore, report pending request on
+	 * server status IPC.
+	 */
+	report_server_status(CHANNEL_REQUEST_PENDING);
+
 	/*
 	 * If in WAIT state, the updated was finished
 	 * by an external process and we have to wait for it
@@ -2039,6 +2063,7 @@  static server_op_res_t server_configuration_ipc(ipc_message *msg)
 
 	json_data = json_get_path_key(
 	    json_root, (const char *[]){"polling", NULL});
+	pthread_mutex_lock(&ipc_lock);
 	if (json_data) {
 		polling = json_object_get_int(json_data);
 		if (polling > 0) {
@@ -2056,6 +2081,7 @@  static server_op_res_t server_configuration_ipc(ipc_message *msg)
 		server_hawkbit.has_to_send_configData = true;
 	}
 
+	pthread_mutex_unlock(&ipc_lock);
 	return SERVER_OK;
 }
 
@@ -2066,11 +2092,13 @@  static server_op_res_t server_status_ipc(ipc_message *msg)
 		.tv_usec = 0
 	};
 
+	pthread_mutex_lock(&ipc_lock);
 	sprintf(msg->data.procmsg.buf,
 		"{\"server\":{\"status\":%d,\"time\":\"%s\"}}",
 		server_hawkbit.server_status,
 		swupdate_time_iso8601(&tv));
 	msg->data.procmsg.len = strlen(msg->data.procmsg.buf);
+	pthread_mutex_unlock(&ipc_lock);
 
 	return SERVER_OK;
 }
diff --git a/suricatta/suricatta.c b/suricatta/suricatta.c
index 29bba2e..a50236b 100644
--- a/suricatta/suricatta.c
+++ b/suricatta/suricatta.c
@@ -11,7 +11,9 @@ 
 #include <stdio.h>
 #include <util.h>
 #include <errno.h>
+#include <semaphore.h>
 #include <signal.h>
+#include <time.h>
 #include <sys/select.h>
 #include <getopt.h>
 #include <json-c/json.h>
@@ -29,6 +31,7 @@  static struct option long_options[] = {
     {"enable", no_argument, NULL, 'e'},
     {"disable", no_argument, NULL, 'd'},
     {NULL, 0, NULL, 0}};
+static sem_t suricatta_enable_sema;
 
 void suricatta_print_help(void)
 {
@@ -58,6 +61,8 @@  static server_op_res_t suricatta_enable(ipc_message *msg)
 	    json_root, (const char *[]){"enable", NULL});
 	if (json_data) {
 		enable = json_object_get_boolean(json_data);
+		if (sem_post(&suricatta_enable_sema))
+			ERROR("sem_post enable failled");
 		TRACE ("suricatta mode %sabled", enable ? "en" : "dis");
 	}
 	else {
@@ -72,6 +77,8 @@  static server_op_res_t suricatta_enable(ipc_message *msg)
 	      json_root, (const char *[]){"trigger", NULL});
 	  if (json_data) {
 	    trigger = json_object_get_boolean(json_data);
+	    if (sem_post(&suricatta_enable_sema))
+		    ERROR("sem_post trigger failled");
 	    TRACE ("suricatta polling trigger received, checking on server");
 	  }
 
@@ -82,7 +89,7 @@  static server_op_res_t suricatta_enable(ipc_message *msg)
 	return SERVER_OK;
 }
 
-static server_op_res_t suricatta_ipc(int fd, time_t *seconds)
+static server_op_res_t suricatta_ipc(int fd)
 {
 	ipc_message msg;
 	server_op_res_t result = SERVER_OK;
@@ -91,18 +98,9 @@  static server_op_res_t suricatta_ipc(int fd, time_t *seconds)
 	ret = read(fd, &msg, sizeof(msg));
 	if (ret != sizeof(msg))
 		return SERVER_EERR;
-
 	switch (msg.data.procmsg.cmd) {
 	case CMD_ENABLE:
 		result = suricatta_enable(&msg);
-		/*
-		 * Note: enable works as trigger, too.
-		 * After enable is set, suricatta will try to contact
-		 * the server to check for pending action
-		 * This is done by resetting the number of seconds to
-		 * wait for.
-		 */
-		*seconds = 0;
 		break;
 	default:
 		result = server.ipc(&msg);
@@ -127,30 +125,67 @@  static int suricatta_settings(void *elem, void  __attribute__ ((__unused__)) *da
 
 int suricatta_wait(int seconds)
 {
-	fd_set readfds;
-	struct timeval tv;
+	struct timespec tp;
 	int retval;
+	int enable_entry = enable;
 
-	tv.tv_sec = seconds;
-	tv.tv_usec = 0;
-	FD_ZERO(&readfds);
-	FD_SET(sw_sockfd, &readfds);
-	DEBUG("Sleeping for %ld seconds.", tv.tv_sec);
-	retval = select(sw_sockfd + 1, &readfds, NULL, NULL, &tv);
-	if (retval < 0) {
-		TRACE("Suricatta awakened because of: %s", strerror(errno));
-		return 0;
-	}
-	if (retval && FD_ISSET(sw_sockfd, &readfds)) {
-		TRACE("Suricatta woke up for IPC at %ld seconds", tv.tv_sec);
-		if (suricatta_ipc(sw_sockfd, &tv.tv_sec) != SERVER_OK){
-			DEBUG("Handling IPC failed!");
+	clock_gettime(CLOCK_REALTIME, &tp);
+	int t_entry = tp.tv_sec;
+
+	tp.tv_sec += seconds;
+	DEBUG("Sleeping for %d seconds.", seconds);
+	retval = sem_timedwait(&suricatta_enable_sema, &tp);
+
+	if (retval) {
+		if (errno != ETIMEDOUT) {
+			TRACE("Suricatta awakened because of: %s", strerror(errno));
+			return 0;
 		}
-		return (int)tv.tv_sec;
+		/* else: Suricatta awakened because timeout expired */
+	} else {
+		/* suricatta_enable_sema unlocked */
+		time_t t_wake = time(NULL);
+
+		TRACE("Suricatta woke up for IPC at %ld seconds", t_wake - t_entry);
+		/*
+		 * Note: enable works as trigger, too.
+		 * After enable is set, suricatta will try to contact
+		 * the server to check for pending action
+		 * This is done by resetting the number of seconds to
+		 * wait for.
+		 */
+		if (trigger || (enable && !enable_entry))
+			return 0;
+		else
+			return seconds - (t_wake - t_entry);
 	}
+
 	return 0;
 }
 
+static void *ipc_thread(void __attribute__ ((__unused__)) *data)
+{
+	fd_set readfds;
+	int retval;
+
+	while (1) {
+		FD_ZERO(&readfds);
+		FD_SET(sw_sockfd, &readfds);
+		retval = select(sw_sockfd + 1, &readfds, NULL, NULL, NULL);
+
+		if (retval < 0) {
+			TRACE("Suricatta IPC awakened because of: %s", strerror(errno));
+			return 0;
+		}
+
+		if (retval && FD_ISSET(sw_sockfd, &readfds)) {
+			if (suricatta_ipc(sw_sockfd) != SERVER_OK) {
+				DEBUG("Handling IPC failed!");
+			}
+		}
+	}
+}
+
 int start_suricatta(const char *cfgfname, int argc, char *argv[])
 {
 	int action_id;
@@ -206,6 +241,16 @@  int start_suricatta(const char *cfgfname, int argc, char *argv[])
 		}
 	}
 
+	if (sem_init(&suricatta_enable_sema, 0, 0)) {
+		ERROR("Initialising suricatta enable semaphore failed");
+		exit(EXIT_FAILURE);
+	}
+
+	/*
+	 * Start ipc thread here, because the following server.start might block
+	 */
+	start_thread(ipc_thread, NULL);
+
 	/*
 	 * Now start a specific implementation of the server
 	 */