diff mbox series

[v3,1/1] IPC: add status streaming support

Message ID 20210916072612.2022-1-james.hilliard1@gmail.com
State Accepted
Headers show
Series [v3,1/1] IPC: add status streaming support | expand

Commit Message

James Hilliard Sept. 16, 2021, 7:26 a.m. UTC
This adds a NOTIFY_STREAM command to the ctrl interface, this
is implemented with a history replay functionality so that
clients can see notify messages from before they connect.

This has the advantage of making it possible for multiple clients
to monitor notify messages at the same time.

Signed-off-by: James Hilliard <james.hilliard1@gmail.com>
---
Changes v2 -> v3:
  - unlock stream_mutex on OOM error

Changes v1 -> v2:
  - move stream interface into network_ipc ctrl socket
  - add history replay
---
 core/network_thread.c         | 128 +++++++++++++++++++++++++++++++++-
 include/network_ipc.h         |  11 ++-
 ipc/network_ipc.c             |  79 +++++++++++++++++++++
 mongoose/mongoose_interface.c |  39 +++++++----
 4 files changed, 242 insertions(+), 15 deletions(-)

Comments

Stefano Babic Sept. 24, 2021, 9:30 a.m. UTC | #1
Hi James,

On 16.09.21 09:26, James Hilliard wrote:
> This adds a NOTIFY_STREAM command to the ctrl interface, this
> is implemented with a history replay functionality so that
> clients can see notify messages from before they connect.
> 
> This has the advantage of making it possible for multiple clients
> to monitor notify messages at the same time.
> 
> Signed-off-by: James Hilliard <james.hilliard1@gmail.com>
> ---
> Changes v2 -> v3:
>    - unlock stream_mutex on OOM error
> 
> Changes v1 -> v2:
>    - move stream interface into network_ipc ctrl socket
>    - add history replay
> ---

Patch looks great to me, I made some (simple) tests myself, I hope that 
after pushing into -master we can get more feedback / tests from 
community if there are some issues.

Applied to -master, thanks !

Best regards,
Stefano Babic
diff mbox series

Patch

diff --git a/core/network_thread.c b/core/network_thread.c
index adaf21c..27e8fd8 100644
--- a/core/network_thread.c
+++ b/core/network_thread.c
@@ -69,6 +69,14 @@  static pthread_t subprocess_ipc_handler_thread_id;
 static pthread_mutex_t subprocess_msg_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t subprocess_wkup = PTHREAD_COND_INITIALIZER;
 
+struct notify_conn {
+	SIMPLEQ_ENTRY(notify_conn) next;
+	int sockfd;
+};
+
+SIMPLEQ_HEAD(connections, notify_conn);
+static struct connections notify_conns;
+
 static bool is_selection_allowed(const char *software_set, char *running_mode,
 				 struct dict const *acceptedlist)
 {
@@ -116,11 +124,61 @@  static void clean_msg(char *msg, char drop)
 	}
 }
 
+static int write_notify_msg(ipc_message *msg, int sockfd)
+{
+	void *buf;
+	size_t count;
+	ssize_t n;
+	int ret = 0;
+
+	buf = msg;
+	count = sizeof(*msg);
+	while (count > 0) {
+		n = send(sockfd, buf, count, MSG_NOSIGNAL);
+		if (n <= 0) {
+			/*
+			 * We can't use the notify methods for error logging here as it will cause a deadlock.
+			 */
+			if (n == 0) {
+				fprintf(stderr, "Error: A status client is not responding, removing it.\n");
+			} else {
+				fprintf(stderr, "A status client disappeared, removing it: %s\n", strerror(errno));
+			}
+			ret = -1;
+			break;
+		}
+		count -= (size_t)n;
+		buf = (char*)buf + n;
+	}
+	return ret;
+}
+
+/*
+ * This must be called after acquiring the mutex
+ * for the msglock structure
+ */
+static void send_notify_msg(ipc_message *msg)
+{
+	struct notify_conn *conn, *tmp;
+	int ret;
+
+	SIMPLEQ_FOREACH_SAFE(conn, &notify_conns, next, tmp) {
+		ret = write_notify_msg(msg, conn->sockfd);
+		if (ret < 0) {
+			close(conn->sockfd);
+			SIMPLEQ_REMOVE(&notify_conns, conn,
+						   notify_conn, next);
+			free(conn);
+		}
+	}
+}
+
 static void network_notifier(RECOVERY_STATUS status, int error, int level, const char *msg)
 {
 	int len = msg ? strlen(msg) : 0;
 	struct msg_elem *newmsg = (struct msg_elem *)calloc(1, sizeof(*newmsg) + len + 1);
 	struct msg_elem *oldmsg;
+	ipc_message ipcmsg;
 
 	if (!newmsg)
 		return;
@@ -148,6 +206,18 @@  static void network_notifier(RECOVERY_STATUS status, int error, int level, const
 
 
 	SIMPLEQ_INSERT_TAIL(&notifymsgs, newmsg, next);
+
+	ipcmsg.magic = IPC_MAGIC;
+	ipcmsg.type = NOTIFY_STREAM;
+	memset(ipcmsg.data.msg, 0, sizeof(ipcmsg.data.msg));
+
+	strncpy(ipcmsg.data.notify.msg, newmsg->msg,
+			sizeof(ipcmsg.data.notify.msg) - 1);
+	ipcmsg.data.notify.status = newmsg->status;
+	ipcmsg.data.notify.error = newmsg->error;
+	ipcmsg.data.notify.level = newmsg->level;
+	send_notify_msg(&ipcmsg);
+
 	pthread_mutex_unlock(&msglock);
 }
 
@@ -366,7 +436,8 @@  void *network_thread (void *data)
 	struct sockaddr_un cliaddr;
 	ipc_message msg;
 	int nread;
-	struct msg_elem *notification;
+	struct msg_elem *notification, *tmp;
+	struct notify_conn *conn;
 	int ret;
 	update_state_t value;
 	struct subprocess_msg_elem *subprocess_msg;
@@ -378,6 +449,7 @@  void *network_thread (void *data)
 	}
 
 	SIMPLEQ_INIT(&notifymsgs);
+	SIMPLEQ_INIT(&notify_conns);
 	SIMPLEQ_INIT(&subprocess_messages);
 	register_notifier(network_notifier);
 
@@ -507,6 +579,60 @@  void *network_thread (void *data)
 				}
 				pthread_mutex_unlock(&msglock);
 
+				break;
+			case NOTIFY_STREAM:
+				msg.type = ACK;
+				memset(msg.data.msg, 0, sizeof(msg.data.msg));
+				msg.data.status.current = instp->status;
+				msg.data.status.last_result = instp->last_install;
+				msg.data.status.error = instp->last_error;
+
+				ret = write(ctrlconnfd, &msg, sizeof(msg));
+				msg.type = NOTIFY_STREAM;
+				if (ret < 0) {
+					ERROR("Error write notify ack on socket ctrl");
+					close(ctrlconnfd);
+					break;
+				}
+
+				/* Get first notification from the queue */
+				pthread_mutex_lock(&msglock);
+				notification = SIMPLEQ_FIRST(&notifymsgs);
+
+				/* Send notify history */
+				SIMPLEQ_FOREACH_SAFE(notification, &notifymsgs, next, tmp) {
+					memset(msg.data.msg, 0, sizeof(msg.data.msg));
+
+					strncpy(msg.data.notify.msg, notification->msg,
+							sizeof(msg.data.notify.msg) - 1);
+					msg.data.notify.status = notification->status;
+					msg.data.notify.error = notification->error;
+					msg.data.notify.level = notification->level;
+
+					ret = write_notify_msg(&msg, ctrlconnfd);
+					if (ret < 0) {
+						pthread_mutex_unlock(&msglock);
+						ERROR("Error write notify history on socket ctrl");
+						close(ctrlconnfd);
+						break;
+					}
+				}
+
+				/*
+				 * Save the new connection to send notifications to
+				 */
+				conn = (struct notify_conn *)calloc(1, sizeof(*conn));
+				if (!conn) {
+					pthread_mutex_unlock(&msglock);
+					ERROR("Out of memory, skipping...");
+					close(ctrlconnfd);
+					pthread_mutex_unlock(&stream_mutex);
+					continue;
+				}
+				conn->sockfd = ctrlconnfd;
+				SIMPLEQ_INSERT_TAIL(&notify_conns, conn, next);
+				pthread_mutex_unlock(&msglock);
+
 				break;
 			case SET_AES_KEY:
 #ifndef CONFIG_PKCS11
diff --git a/include/network_ipc.h b/include/network_ipc.h
index 1a3d957..f370ccf 100644
--- a/include/network_ipc.h
+++ b/include/network_ipc.h
@@ -37,7 +37,8 @@  typedef enum {
 	SET_UPDATE_STATE,	/* set bootloader ustate */
 	GET_UPDATE_STATE,
 	REQ_INSTALL_EXT,
-	SET_VERSIONS_RANGE
+	SET_VERSIONS_RANGE,
+	NOTIFY_STREAM
 } msgtype;
 
 /*
@@ -80,6 +81,12 @@  typedef union {
 		int error;
 		char desc[2048];
 	} status;
+	struct {
+		int status;
+		int error;
+		int level;
+		char msg[2048];
+	} notify;
 	struct {
 		struct swupdate_request req;
 		unsigned int len;    /* Len of data valid in buf */
@@ -122,6 +129,8 @@  int ipc_send_data(int connfd, char *buf, int size);
 void ipc_end(int connfd);
 int ipc_get_status(ipc_message *msg);
 int ipc_get_status_timeout(ipc_message *msg, unsigned int timeout_ms);
+int ipc_notify_connect(void);
+int ipc_notify_receive(int *connfd, ipc_message *msg);
 int ipc_postupdate(ipc_message *msg);
 int ipc_send_cmd(ipc_message *msg);
 
diff --git a/ipc/network_ipc.c b/ipc/network_ipc.c
index 767079d..5ccc18b 100644
--- a/ipc/network_ipc.c
+++ b/ipc/network_ipc.c
@@ -163,6 +163,85 @@  int ipc_get_status_timeout(ipc_message *msg, unsigned int timeout_ms)
 	return ret == 0 ? sizeof(*msg) : -1;
 }
 
+static int __ipc_start_notify(int connfd, ipc_message *msg, unsigned int timeout_ms)
+{
+	fd_set fds;
+	struct timeval tv;
+
+	memset(msg, 0, sizeof(*msg));
+	msg->magic = IPC_MAGIC;
+	msg->type = NOTIFY_STREAM;
+
+	if (write(connfd, msg, sizeof(*msg)) != sizeof(*msg))
+		return -1;
+
+	if (timeout_ms) {
+		FD_ZERO(&fds);
+		FD_SET(connfd, &fds);
+
+		/*
+		 * Invalid the message
+		 * Caller should check it
+		 */
+		msg->magic = 0;
+
+		tv.tv_sec = 0;
+		tv.tv_usec = timeout_ms * 1000;
+		if ((select(connfd + 1, &fds, NULL, NULL, &tv) <= 0) ||
+		!FD_ISSET(connfd, &fds))
+			return -ETIMEDOUT;
+	}
+
+	return -(read(connfd, msg, sizeof(*msg)) != sizeof(*msg));
+}
+
+int ipc_notify_connect(void)
+{
+	int ret;
+	int connfd;
+	ipc_message msg;
+
+	connfd = prepare_ipc();
+	if (connfd < 0)
+		return -1;
+
+	/*
+	 * Initialize the notify stream
+	 */
+	ret = __ipc_start_notify(connfd, &msg, 0);
+	if (ret || msg.type != ACK) {
+		fprintf(stdout, "Notify connection handshake failed..\n");
+		close(connfd);
+		return ret;
+	}
+
+	return connfd;
+}
+
+int ipc_notify_receive(int *connfd, ipc_message *msg)
+{
+	int ret = read(*connfd, msg, sizeof(*msg));
+
+	if (ret == -1 && (errno == EAGAIN || errno == EINTR))
+		return 0;
+
+	if (ret != sizeof(*msg)) {
+		fprintf(stdout, "Connection closing..\n");
+		close(*connfd);
+		*connfd = -1;
+		return -1;
+	}
+
+	if (msg->magic != IPC_MAGIC) {
+		fprintf(stdout, "Connection closing, invalid magic...\n");
+		close(*connfd);
+		*connfd = -1;
+		return -1;
+	}
+
+	return ret;
+}
+
 int ipc_inst_start_ext(void *priv, ssize_t size)
 {
 	int connfd;
diff --git a/mongoose/mongoose_interface.c b/mongoose/mongoose_interface.c
index 2e9416b..1878101 100644
--- a/mongoose/mongoose_interface.c
+++ b/mongoose/mongoose_interface.c
@@ -141,31 +141,44 @@  static void broadcast(struct mg_mgr *mgr, char *str)
 
 static void *broadcast_message_thread(void *data)
 {
+	int fd = -1;
+
 	for (;;) {
 		ipc_message msg;
-		int ret = ipc_get_status(&msg);
+		int ret;
 
-		if (!ret && strlen(msg.data.status.desc) != 0) {
+		if (fd < 0)
+			fd = ipc_notify_connect();
+		/*
+		 * if still fails, try later
+		 */
+		if (fd < 0) {
+			sleep(1);
+			continue;
+		}
+
+		ret = ipc_notify_receive(&fd, &msg);
+		if (ret != sizeof(msg))
+			return NULL;
+
+		if (strlen(msg.data.notify.msg) != 0) {
 			struct mg_mgr *mgr = (struct mg_mgr *) data;
 			char text[4096];
 			char str[4160];
 
-			snescape(text, sizeof(text), msg.data.status.desc);
+			snescape(text, sizeof(text), msg.data.notify.msg);
 
 			snprintf(str, sizeof(str),
-				"{\r\n"
-				"\t\"type\": \"message\",\r\n"
-				"\t\"level\": \"%d\",\r\n"
-				"\t\"text\": \"%s\"\r\n"
-				"}\r\n",
-				(msg.data.status.error) ? 3 : 6, /* RFC 5424 */
-				text);
+					 "{\r\n"
+					 "\t\"type\": \"message\",\r\n"
+					 "\t\"level\": \"%d\",\r\n"
+					 "\t\"text\": \"%s\"\r\n"
+					 "}\r\n",
+					 msg.data.notify.level, /* RFC 5424 */
+					 text);
 
 			broadcast(mgr, str);
-			continue;
 		}
-
-		usleep(50 * 1000);
 	}
 
 	return NULL;