ipa: Change ipa_msg_recv() to support partial receive
diff mbox

Message ID 1396256012-11236-1-git-send-email-jerlbeck@sysmocom.de
State Accepted
Headers show

Commit Message

Jacob Erlbeck March 31, 2014, 8:53 a.m. UTC
Currently ipa_msg_recv() fails, when messages are received partially.

This patch provides a new function ipa_msg_recv_buffered() that uses
an additional ** to a message buffer to store partial data.  When
this happens, -EAGAIN is returned. If NULL is used, the function
behaves similar to ipa_msg_recv() and fails on partial read.
In addition in case of errors the return value is now always -EXXX
and the contents of errno is undefined.

Note that this feature needs support by the calling code insofar that
*tmp_msg must be set to NULL initially and it must be freed and
set to NULL manually when the socket is closed.

Note also that ipa_msg_recv() is then a wrapper around
ipa_msg_recv_buffered() which mimics the old error behaviour by
setting errno explicitely to -rc and returning -1 when an error has
happened.

Ticket: OW#728
Sponsored-by: On-Waves ehf
---
 TODO-RELEASE                    |    1 +
 include/osmocom/abis/e1_input.h |    2 +
 include/osmocom/abis/ipa.h      |    3 +
 src/input/ipa.c                 |  151 ++++++++++++++++++++++++++++++---------
 src/input/ipaccess.c            |   13 +++-
 tests/ipa_recv/ipa_recv_test.c  |   55 +++++++++-----
 tests/ipa_recv/ipa_recv_test.ok |   27 ++++++-
 7 files changed, 198 insertions(+), 54 deletions(-)

Comments

Holger Freyther March 31, 2014, 1:07 p.m. UTC | #1
On Mon, Mar 31, 2014 at 10:53:32AM +0200, Jacob Erlbeck wrote:

Dear Jacob,


> +		/* first read our 3-byte header */
> +		needed = sizeof(*hh) - msg->len;
> +		ret = recv(fd, msg->tail, needed, 0);
> +		if (ret == 0)
> +		       goto discard_msg;
> +
> +		if (ret < 0) {
> +			if (errno == EAGAIN || errno == EINTR)
> +				ret = 0;
> +			else {
> +				ret = -errno;
> +				goto discard_msg;
> +			}
> +		}
> +
> +		msgb_put(msg, ret);
> +
> +		if (ret < needed) {
> +			if (msg->len == 0) {
> +				ret = -EAGAIN;
> +				goto discard_msg;
> +			}
> +
> +			LOGP(DLINP, LOGL_INFO,
> +			     "Received part of IPA message header (%d/%d)\n",
> +			     msg->len, sizeof(*hh));

			^ tab vs. spaces?


> +			if (!tmp_msg) {
> +				ret = -EIO;
> +				goto discard_msg;
> +			}
> +			*tmp_msg = msg;
> +			return -EAGAIN;
> +		}
> +
> +		msg->l2h = msg->tail;
> +	}

> +	if (needed > 0) {
> +		ret = recv(fd, msg->tail, needed, 0);
> +
> +		if (ret == 0)
> +			goto discard_msg;
> +
> +		if (ret < 0) {
> +			if (errno == EAGAIN || errno == EINTR)
> +				ret = 0;
> +			else {
> +				ret = -errno;
> +				goto discard_msg;
> +			}
> +		}
> +
> +		msgb_put(msg, ret);
> +
> +		if (ret < needed) {
> +			LOGP(DLINP, LOGL_INFO,
> +			     "Received part of IPA message L2 data (%d/%d)\n",
> +			    msgb_l2len(msg), len);
> +			if (!tmp_msg) {
> +				ret = -EIO;
> +				goto discard_msg;
> +			}
> +			*tmp_msg = msg;
> +			return -EAGAIN;
> +		}
>  	}


Do you think readability would be improved if these two paths could be
united? I will push and make releases today for this new feature.

Patch
diff mbox

diff --git a/TODO-RELEASE b/TODO-RELEASE
index 43b1e8e..71931db 100644
--- a/TODO-RELEASE
+++ b/TODO-RELEASE
@@ -1 +1,2 @@ 
 #library	what		description / commit summary line
+libosmoabis	abi-change	ipa: Change ipa_msg_recv() to support partial receive
diff --git a/include/osmocom/abis/e1_input.h b/include/osmocom/abis/e1_input.h
index 9b77893..cf8677b 100644
--- a/include/osmocom/abis/e1_input.h
+++ b/include/osmocom/abis/e1_input.h
@@ -109,6 +109,8 @@  struct e1inp_ts {
 			struct osmo_fd fd;
 		} rs232;
 	} driver;
+
+	struct msgb *pending_msg;
 };
 
 struct gsm_e1_subslot {
diff --git a/include/osmocom/abis/ipa.h b/include/osmocom/abis/ipa.h
index d577d74..982b694 100644
--- a/include/osmocom/abis/ipa.h
+++ b/include/osmocom/abis/ipa.h
@@ -27,6 +27,7 @@  struct ipa_server_conn {
 	int (*closed_cb)(struct ipa_server_conn *peer);
 	int (*cb)(struct ipa_server_conn *peer, struct msgb *msg);
 	void				*data;
+	struct msgb			*pending_msg;
 };
 
 struct ipa_server_conn *ipa_server_conn_create(void *ctx, struct ipa_server_link *link, int fd, int (*cb)(struct ipa_server_conn *peer, struct msgb *msg), int (*closed_cb)(struct ipa_server_conn *peer), void *data);
@@ -53,6 +54,7 @@  struct ipa_client_conn {
 	int (*read_cb)(struct ipa_client_conn *link, struct msgb *msg);
 	int (*write_cb)(struct ipa_client_conn *link);
 	void				*data;
+	struct msgb			*pending_msg;
 };
 
 struct ipa_client_conn *ipa_client_conn_create(void *ctx, struct e1inp_ts *ts, int priv_nr, const char *addr, uint16_t port, void (*updown)(struct ipa_client_conn *link, int), int (*read_cb)(struct ipa_client_conn *link, struct msgb *msgb), int (*write_cb)(struct ipa_client_conn *link), void *data);
@@ -64,6 +66,7 @@  void ipa_client_conn_close(struct ipa_client_conn *link);
 void ipa_client_conn_send(struct ipa_client_conn *link, struct msgb *msg);
 
 int ipa_msg_recv(int fd, struct msgb **rmsg);
+int ipa_msg_recv_buffered(int fd, struct msgb **rmsg, struct msgb **tmp_msg);
 
 int ipaccess_rcvmsg_base(struct msgb *msg, struct osmo_fd *bfd);
 
diff --git a/src/input/ipa.c b/src/input/ipa.c
index b5abd36..71e1227 100644
--- a/src/input/ipa.c
+++ b/src/input/ipa.c
@@ -49,50 +49,130 @@  void ipa_msg_push_header(struct msgb *msg, uint8_t proto)
 
 int ipa_msg_recv(int fd, struct msgb **rmsg)
 {
-	struct msgb *msg;
+	int rc = ipa_msg_recv_buffered(fd, rmsg, NULL);
+	if (rc < 0) {
+		errno = -rc;
+		rc = -1;
+	}
+	return rc;
+}
+
+int ipa_msg_recv_buffered(int fd, struct msgb **rmsg, struct msgb **tmp_msg)
+{
+	struct msgb *msg = tmp_msg ? *tmp_msg : NULL;
 	struct ipaccess_head *hh;
 	int len, ret;
+	int needed;
 
-	msg = ipa_msg_alloc(0);
 	if (msg == NULL)
-		return -ENOMEM;
+		msg = ipa_msg_alloc(0);
 
-	/* first read our 3-byte header */
-	hh = (struct ipaccess_head *) msg->data;
-	ret = recv(fd, msg->data, sizeof(*hh), 0);
-	if (ret <= 0) {
-		msgb_free(msg);
-		return ret;
-	} else if (ret != sizeof(*hh)) {
-		LOGP(DLINP, LOGL_ERROR, "too small message received\n");
-		msgb_free(msg);
-		return -EIO;
+	if (msg == NULL) {
+		ret = -ENOMEM;
+		goto discard_msg;
 	}
-	msgb_put(msg, ret);
+
+	if (msg->l2h == NULL) {
+		/* first read our 3-byte header */
+		needed = sizeof(*hh) - msg->len;
+		ret = recv(fd, msg->tail, needed, 0);
+		if (ret == 0)
+		       goto discard_msg;
+
+		if (ret < 0) {
+			if (errno == EAGAIN || errno == EINTR)
+				ret = 0;
+			else {
+				ret = -errno;
+				goto discard_msg;
+			}
+		}
+
+		msgb_put(msg, ret);
+
+		if (ret < needed) {
+			if (msg->len == 0) {
+				ret = -EAGAIN;
+				goto discard_msg;
+			}
+
+			LOGP(DLINP, LOGL_INFO,
+			     "Received part of IPA message header (%d/%d)\n",
+			     msg->len, sizeof(*hh));
+			if (!tmp_msg) {
+				ret = -EIO;
+				goto discard_msg;
+			}
+			*tmp_msg = msg;
+			return -EAGAIN;
+		}
+
+		msg->l2h = msg->tail;
+	}
+
+	hh = (struct ipaccess_head *) msg->data;
 
 	/* then read the length as specified in header */
-	msg->l2h = msg->data + sizeof(*hh);
 	len = ntohs(hh->len);
 
 	if (len < 0 || IPA_ALLOC_SIZE < len + sizeof(*hh)) {
 		LOGP(DLINP, LOGL_ERROR, "bad message length of %d bytes, "
-					"received %d bytes\n", len, ret);
-		msgb_free(msg);
-		return -EIO;
+					"received %d bytes\n", len, msg->len);
+		ret = -EIO;
+		goto discard_msg;
 	}
 
-	ret = recv(fd, msg->l2h, len, 0);
-	if (ret <= 0) {
-		msgb_free(msg);
-		return ret;
-	} else if (ret < len) {
-		LOGP(DLINP, LOGL_ERROR, "truncated message received\n");
-		msgb_free(msg);
-		return -EIO;
+	needed = len - msgb_l2len(msg);
+
+	if (needed > 0) {
+		ret = recv(fd, msg->tail, needed, 0);
+
+		if (ret == 0)
+			goto discard_msg;
+
+		if (ret < 0) {
+			if (errno == EAGAIN || errno == EINTR)
+				ret = 0;
+			else {
+				ret = -errno;
+				goto discard_msg;
+			}
+		}
+
+		msgb_put(msg, ret);
+
+		if (ret < needed) {
+			LOGP(DLINP, LOGL_INFO,
+			     "Received part of IPA message L2 data (%d/%d)\n",
+			    msgb_l2len(msg), len);
+			if (!tmp_msg) {
+				ret = -EIO;
+				goto discard_msg;
+			}
+			*tmp_msg = msg;
+			return -EAGAIN;
+		}
 	}
-	msgb_put(msg, ret);
+
+	ret = msgb_l2len(msg);
+
+	if (ret == 0) {
+		LOGP(DLINP, LOGL_INFO,
+		     "Discarding IPA message without payload\n");
+		ret = -EAGAIN;
+		goto discard_msg;
+	}
+
+	if (tmp_msg)
+		*tmp_msg = NULL;
 	*rmsg = msg;
 	return ret;
+
+discard_msg:
+	if (tmp_msg)
+		*tmp_msg = NULL;
+	msgb_free(msg);
+	return ret;
 }
 
 void ipa_client_conn_close(struct ipa_client_conn *link)
@@ -103,6 +183,8 @@  void ipa_client_conn_close(struct ipa_client_conn *link)
 		close(link->ofd->fd);
 		link->ofd->fd = -1;
 	}
+	msgb_free(link->pending_msg);
+	link->pending_msg = NULL;
 }
 
 static void ipa_client_read(struct ipa_client_conn *link)
@@ -113,11 +195,12 @@  static void ipa_client_read(struct ipa_client_conn *link)
 
 	LOGP(DLINP, LOGL_DEBUG, "message received\n");
 
-	ret = ipa_msg_recv(ofd->fd, &msg);
+	ret = ipa_msg_recv_buffered(ofd->fd, &msg, &link->pending_msg);
 	if (ret < 0) {
-		if (errno == EPIPE || errno == ECONNRESET) {
+		if (ret == -EAGAIN)
+			return;
+		if (ret == -EPIPE || ret == -ECONNRESET)
 			LOGP(DLINP, LOGL_ERROR, "lost connection with server\n");
-		}
 		ipa_client_conn_close(link);
 		if (link->updown_cb)
 			link->updown_cb(link, 0);
@@ -382,11 +465,12 @@  static void ipa_server_conn_read(struct ipa_server_conn *conn)
 
 	LOGP(DLINP, LOGL_DEBUG, "message received\n");
 
-	ret = ipa_msg_recv(ofd->fd, &msg);
+	ret = ipa_msg_recv_buffered(ofd->fd, &msg, &conn->pending_msg);
 	if (ret < 0) {
-		if (errno == EPIPE || errno == ECONNRESET) {
+		if (ret == -EAGAIN)
+			return;
+		if (ret == -EPIPE || ret == -ECONNRESET)
 			LOGP(DLINP, LOGL_ERROR, "lost connection with server\n");
-		}
 		ipa_server_conn_destroy(conn);
 		return;
 	} else if (ret == 0) {
@@ -471,6 +555,7 @@  ipa_server_conn_create(void *ctx, struct ipa_server_link *link, int fd,
 void ipa_server_conn_destroy(struct ipa_server_conn *conn)
 {
 	close(conn->ofd.fd);
+	msgb_free(conn->pending_msg);
 	osmo_fd_unregister(&conn->ofd);
 	if (conn->closed_cb)
 		conn->closed_cb(conn);
diff --git a/src/input/ipaccess.c b/src/input/ipaccess.c
index 225d70c..7ac5ad1 100644
--- a/src/input/ipaccess.c
+++ b/src/input/ipaccess.c
@@ -258,6 +258,8 @@  int ipaccess_rcvmsg_bts_base(struct msgb *msg,
 static int ipaccess_drop(struct osmo_fd *bfd, struct e1inp_line *line)
 {
 	int ret = 1;
+	unsigned int ts_nr = bfd->priv_nr;
+	struct e1inp_ts *e1i_ts = &line->ts[ts_nr-1];
 
 	/* Error case: we did not see any ID_RESP yet for this socket. */
 	if (bfd->fd != -1) {
@@ -269,6 +271,9 @@  static int ipaccess_drop(struct osmo_fd *bfd, struct e1inp_line *line)
 		ret = -ENOENT;
 	}
 
+	msgb_free(e1i_ts->pending_msg);
+	e1i_ts->pending_msg = NULL;
+
 	/* e1inp_sign_link_destroy releases the socket descriptors for us. */
 	line->ops->sign_link_down(line);
 
@@ -415,13 +420,15 @@  static int handle_ts1_read(struct osmo_fd *bfd)
 	struct e1inp_ts *e1i_ts = &line->ts[ts_nr-1];
 	struct e1inp_sign_link *link;
 	struct ipaccess_head *hh;
-	struct msgb *msg;
+	struct msgb *msg = NULL;
 	int ret;
 
-	ret = ipa_msg_recv(bfd->fd, &msg);
+	ret = ipa_msg_recv_buffered(bfd->fd, &msg, &e1i_ts->pending_msg);
 	if (ret < 0) {
+		if (ret == -EAGAIN)
+			return 0;
 		LOGP(DLINP, LOGL_NOTICE, "Sign link problems, "
-			"closing socket. Reason: %s\n", strerror(errno));
+			"closing socket. Reason: %s\n", strerror(-ret));
 		goto err;
 	} else if (ret == 0) {
 		LOGP(DLINP, LOGL_NOTICE, "Sign link vanished, dead socket\n");
diff --git a/tests/ipa_recv/ipa_recv_test.c b/tests/ipa_recv/ipa_recv_test.c
index 7b26259..8cdc7e2 100644
--- a/tests/ipa_recv/ipa_recv_test.c
+++ b/tests/ipa_recv/ipa_recv_test.c
@@ -86,7 +86,7 @@  static void append_ipa_message(struct msgb *msg, int proto, const char *text)
 		strcpy((char *)l2, text);
 }
 
-static int receive_messages(int fd)
+static int receive_messages(int fd, struct msgb **pending_msg)
 {
 	struct msgb *msg;
 	char dummy;
@@ -97,13 +97,22 @@  static int receive_messages(int fd)
 			break;
 		}
 		msg = NULL;
-		rc = ipa_msg_recv(fd, &msg);
-		if (rc == -1)
-			rc = -errno;
+		rc = ipa_msg_recv_buffered(fd, &msg, pending_msg);
+
 		fprintf(stderr,
-			"ipa_msg_recv: %d, msg %s NULL\n",
-			rc, msg ? "!=" : "==");
-		if (rc == -EAGAIN)
+			"ipa_msg_recv_buffered: %d, msg %s NULL, "
+			"pending_msg %s NULL\n",
+			rc, msg ? "!=" : "==",
+			!pending_msg ? "??" : *pending_msg ? "!=" : "==");
+		if (pending_msg && !!msg == !!*pending_msg)
+			printf( "got msg %s NULL, pending_msg %s NULL, "
+				"returned: %s\n",
+				msg ?  "!=" : "==",
+				*pending_msg ? "!=" : "==",
+				rc == 0 ? "EOF" :
+				rc > 0 ? "OK" :
+				strerror(-rc));
+		else if (!pending_msg && rc == -EAGAIN)
 			printf( "got msg %s NULL, "
 				"returned: %s\n",
 				msg ?  "!=" : "==",
@@ -115,7 +124,8 @@  static int receive_messages(int fd)
 		if (rc == -EAGAIN)
 			break;
 		if (rc < 0) {
-			printf("ipa_msg_recv failed with: %s\n", strerror(-rc));
+			printf("ipa_msg_recv_buffered failed with: %s\n",
+			       strerror(-rc));
 			return rc;
 		}
 		printf("got IPA message, size=%d, proto=%d, text=\"%s\"\n",
@@ -142,13 +152,15 @@  static int slurp_data(int fd) {
 	return count;
 };
 
-static void test_complete_recv(void)
+static void test_complete_recv(int do_not_assemble)
 {
 	int sv[2];
 	struct msgb *msg_out = msgb_alloc(4096, "msg_out");
+	struct msgb *pending_msg = NULL;
 	int rc, i;
 
-	printf("Testing IPA recv with complete messages.\n");
+	printf("Testing IPA recv with complete messages%s.\n",
+	       do_not_assemble ? "" : " with assembling enabled");
 
 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1)
 		err(1, "socketpair");
@@ -166,7 +178,11 @@  static void test_complete_recv(void)
 	}
 
 	for (i=0; i < ARRAY_SIZE(ipa_test_messages); i++) {
-		rc = receive_messages(sv[0]);
+		rc = receive_messages(sv[0],
+				      do_not_assemble ? NULL : &pending_msg);
+		if (pending_msg)
+			printf("Unexpected partial message: size=%d\n",
+			       pending_msg->len);
 		if (rc == 0)
 			break;
 
@@ -181,16 +197,19 @@  static void test_complete_recv(void)
 	close(sv[0]);
 
 	msgb_free(msg_out);
+	msgb_free(pending_msg);
 }
 
 
-static void test_partial_recv(void)
+static void test_partial_recv(int do_not_assemble)
 {
 	int sv[2];
 	struct msgb *msg_out = msgb_alloc(4096, "msg_out");
+	struct msgb *pending_msg = NULL;
 	int rc, i;
 
-	printf("Testing IPA recv with partitioned messages.\n");
+	printf("Testing IPA recv with partitioned messages%s.\n",
+	       do_not_assemble ? "" : " with assembling enabled");
 
 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1)
 		err(1, "socketpair");
@@ -211,7 +230,8 @@  static void test_partial_recv(void)
 		if (msg_out->len == 0)
 			shutdown(sv[1], SHUT_WR);
 
-		rc = receive_messages(sv[0]);
+		rc = receive_messages(sv[0],
+				      do_not_assemble ? NULL : &pending_msg);
 
 		if (rc == 0)
 			break;
@@ -226,6 +246,7 @@  static void test_partial_recv(void)
 	close(sv[0]);
 
 	msgb_free(msg_out);
+	msgb_free(pending_msg);
 }
 
 static struct log_info info = {};
@@ -239,8 +260,10 @@  int main(int argc, char **argv)
 	printf("Testing the IPA layer.\n");
 
 	/* run the tests */
-	test_complete_recv();
-	test_partial_recv();
+	test_complete_recv(1);
+	test_partial_recv(1);
+	test_complete_recv(0);
+	test_partial_recv(0);
 
 	printf("No crashes.\n");
 	return 0;
diff --git a/tests/ipa_recv/ipa_recv_test.ok b/tests/ipa_recv/ipa_recv_test.ok
index 4144d47..bdbfb7d 100644
--- a/tests/ipa_recv/ipa_recv_test.ok
+++ b/tests/ipa_recv/ipa_recv_test.ok
@@ -5,8 +5,31 @@  got IPA message, size=86, proto=200, text="A longer test message. ABCDEFGHIJKLMN
 got IPA message, size=16, proto=200, text="Hello again IPA"
 got IPA message, size=1, proto=200, text=""
 got IPA message, size=14, proto=200, text="Next is empty"
-done: unread 14, unsent 0
+got msg == NULL, returned: Resource temporarily unavailable
+got IPA message, size=4, proto=200, text="Bye"
+got IPA message, size=4, proto=200, text="Bye"
+done: unread 0, unsent 0
 Testing IPA recv with partitioned messages.
-ipa_msg_recv failed with: Input/output error
+ipa_msg_recv_buffered failed with: Input/output error
 done: unread 0, unsent 154
+Testing IPA recv with complete messages with assembling enabled.
+got IPA message, size=10, proto=200, text="Hello IPA"
+got IPA message, size=86, proto=200, text="A longer test message. ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz"
+got IPA message, size=16, proto=200, text="Hello again IPA"
+got IPA message, size=1, proto=200, text=""
+got IPA message, size=14, proto=200, text="Next is empty"
+got msg == NULL, pending_msg == NULL, returned: Resource temporarily unavailable
+got IPA message, size=4, proto=200, text="Bye"
+got IPA message, size=4, proto=200, text="Bye"
+done: unread 0, unsent 0
+Testing IPA recv with partitioned messages with assembling enabled.
+got IPA message, size=10, proto=200, text="Hello IPA"
+got IPA message, size=86, proto=200, text="A longer test message. ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz"
+got IPA message, size=16, proto=200, text="Hello again IPA"
+got IPA message, size=1, proto=200, text=""
+got IPA message, size=14, proto=200, text="Next is empty"
+got msg == NULL, pending_msg == NULL, returned: Resource temporarily unavailable
+got IPA message, size=4, proto=200, text="Bye"
+got IPA message, size=4, proto=200, text="Bye"
+done: unread 0, unsent 0
 No crashes.