diff mbox

[5/6] pipeio: Add fwts_pipe_readwrite

Message ID 1461303672-3648-6-git-send-email-jk@ozlabs.org
State Accepted
Headers show

Commit Message

Jeremy Kerr April 22, 2016, 5:41 a.m. UTC
This change adds a function to process input & output data of a process
opened by fwts_pipe_open_rw(). We add support for the input file
descriptor, and add poll() to allow reads & writes to the child process
(and use non-blocking IO on the file descriptors to make this possible).

We then implement the existing fwts_pipe_read() as a base-case of this
new function, by not providing input data to write.

Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
---
 src/lib/include/fwts_pipeio.h |   3 ++
 src/lib/src/fwts_pipeio.c     | 107 ++++++++++++++++++++++++++++++++++--------
 2 files changed, 90 insertions(+), 20 deletions(-)

Comments

Alex Hung April 27, 2016, 4:31 a.m. UTC | #1
On 2016-04-22 01:41 PM, Jeremy Kerr wrote:
> This change adds a function to process input & output data of a process
> opened by fwts_pipe_open_rw(). We add support for the input file
> descriptor, and add poll() to allow reads & writes to the child process
> (and use non-blocking IO on the file descriptors to make this possible).
>
> We then implement the existing fwts_pipe_read() as a base-case of this
> new function, by not providing input data to write.
>
> Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
> ---
>   src/lib/include/fwts_pipeio.h |   3 ++
>   src/lib/src/fwts_pipeio.c     | 107 ++++++++++++++++++++++++++++++++++--------
>   2 files changed, 90 insertions(+), 20 deletions(-)
>
> diff --git a/src/lib/include/fwts_pipeio.h b/src/lib/include/fwts_pipeio.h
> index e2d498a..7f9aaf4 100644
> --- a/src/lib/include/fwts_pipeio.h
> +++ b/src/lib/include/fwts_pipeio.h
> @@ -34,6 +34,9 @@ int   fwts_pipe_open_ro(const char *command, pid_t *childpid, int *fd);
>   int   fwts_pipe_open_rw(const char *command, pid_t *childpid, int *in_fd,
>   		int *out_fd);
>   int   fwts_pipe_read(const int fd, char **out_buf, ssize_t *out_len);
> +int   fwts_pipe_readwrite(
> +		const int in_fd, const char *in_buf, const size_t in_len,
> +		const int out_fd, char **out_buf, ssize_t *out_len);
>   int   fwts_pipe_close(const int fd, const pid_t pid);
>   int   fwts_pipe_exec(const char *command, fwts_list **list, int *status);
>   int   fwts_exec(const char *command, int *status);
> diff --git a/src/lib/src/fwts_pipeio.c b/src/lib/src/fwts_pipeio.c
> index bb51515..a741477 100644
> --- a/src/lib/src/fwts_pipeio.c
> +++ b/src/lib/src/fwts_pipeio.c
> @@ -32,6 +32,7 @@
>   #include <paths.h>
>
>   #include <sys/param.h>
> +#include <sys/poll.h>
>   #include <sys/types.h>
>   #include <sys/wait.h>
>   #include <errno.h>
> @@ -128,51 +129,117 @@ int fwts_pipe_open_ro(const char *command, pid_t *childpid, int *fd)
>   	return fwts_pipe_open_rw(command, childpid, NULL, fd);
>   }
>
> +static int fwts_pipeio_set_nonblock(const int fd)
> +{
> +	int flags;
> +	if (fd < 0)
> +		return 0;
> +	flags = fcntl(fd, F_GETFL, 0);
> +	flags |= O_NONBLOCK;
> +	return !!fcntl(fd, F_SETFL, flags);
> +}
> +
>   /*
> - *  fwts_pipe_read()
> - *	read output from fwts_pipe_open_ro(), *out_buf is populated with
> - *	returned data (allocated, must be free()-ed after use), and length in
> - *	*out_len.
> + *  fwts_pipe_readwrite()
> + *	send input to and read output from fwts_pipe_open_rw(), in_len bytes
> + *	of in_buf are written to the pipe, and data is read into *out_buf,
> + *	*out_len indicating output length. *out_buf is allocated, and
> + *	must be free()-ed after use.
>    *	Returns non-zero on failure.
>    */
> -int fwts_pipe_read(const int fd, char **out_buf, ssize_t *out_len)
> +int fwts_pipe_readwrite(
> +		const int in_fd, const char *in_buf, const size_t in_len,
> +		const int out_fd, char **out_buf, ssize_t *out_len)
>   {
> +	struct pollfd pollfds[2];
> +	size_t in_size = in_len;
> +	ssize_t out_size = 0;
>   	char *ptr = NULL;
>   	char buffer[8192];
> -	ssize_t size = 0;
> +
>   	*out_len = 0;
>
> -	ptr = NULL;
> +	pollfds[0].fd = out_fd;
> +	pollfds[0].events = POLLIN;
> +	pollfds[1].fd = in_fd;
> +	pollfds[1].events = POLLOUT;
> +
> +	/* we need non-blocking IO */
> +	if (fwts_pipeio_set_nonblock(in_fd))
> +		return -1;
> +
> +	if (fwts_pipeio_set_nonblock(out_fd))
> +		return -1;
>
>   	for (;;) {
> -		ssize_t n = read(fd, buffer, sizeof(buffer));
> +		ssize_t n;
>   		char *tmp;
> +		int rc;
> +
> +		if (in_size == 0 || in_fd < 0 || in_buf == NULL)
> +			pollfds[1].events = 0;
>
> -		if (n == 0)
> +		rc = poll(pollfds, 2, -1);
> +		if (rc < 0)
>   			break;
> -		if (n < 0) {
> -			if (errno != EINTR && errno != EAGAIN) {
> +
> +		if (pollfds[0].revents) {
> +			n = read(out_fd, buffer, sizeof(buffer));
> +
> +			if (n == 0)
> +				break;
> +			if (n < 0) {
> +				if (errno != EINTR && errno != EAGAIN) {
> +					free(ptr);
> +					return -1;
> +				}
> +				continue;
> +			}
> +
> +			if ((tmp = realloc(ptr, out_size + n + 1)) == NULL) {
>   				free(ptr);
>   				return -1;
>   			}
> -			continue;
> +			ptr = tmp;
> +			memcpy(ptr + out_size, buffer, n);
> +			out_size += n;
> +			*(ptr+out_size) = 0;
>   		}
>
> -		if ((tmp = realloc(ptr, size + n + 1)) == NULL) {
> -			free(ptr);
> -			return -1;
> +		if (pollfds[1].revents) {
> +			n = write(in_fd, in_buf, in_size);
> +
> +			if (n < 0) {
> +				if (errno != EINTR && errno != EAGAIN) {
> +					free(ptr);
> +					return -1;
> +				}
> +				continue;
> +			}
> +
> +			in_buf += n;
> +			in_size -= n;
>   		}
> -		ptr = tmp;
> -		memcpy(ptr + size, buffer, n);
> -		size += n;
> -		*(ptr+size) = 0;
> +
>   	}
> -	*out_len = size;
> +
> +	*out_len = out_size;
>   	*out_buf = ptr;
>   	return 0;
>   }
>
>   /*
> + *  fwts_pipe_read()
> + *	read output from fwts_pipe_open_ro(), *length is
> + *	set to the number of chars read and we return
> + *	a buffer of read data.
> + */
> +int fwts_pipe_read(const int fd, char **out_buf, ssize_t *out_len)
> +{
> +	return fwts_pipe_readwrite(-1, NULL, 0, fd, out_buf, out_len);
> +}
> +
> +/*
>    *  fwts_pipe_close()
>    *	close fd, wait for child of given pid to exit
>    */
>

Acked-by: Alex Hung <alex.hung@canonical.com>
diff mbox

Patch

diff --git a/src/lib/include/fwts_pipeio.h b/src/lib/include/fwts_pipeio.h
index e2d498a..7f9aaf4 100644
--- a/src/lib/include/fwts_pipeio.h
+++ b/src/lib/include/fwts_pipeio.h
@@ -34,6 +34,9 @@  int   fwts_pipe_open_ro(const char *command, pid_t *childpid, int *fd);
 int   fwts_pipe_open_rw(const char *command, pid_t *childpid, int *in_fd,
 		int *out_fd);
 int   fwts_pipe_read(const int fd, char **out_buf, ssize_t *out_len);
+int   fwts_pipe_readwrite(
+		const int in_fd, const char *in_buf, const size_t in_len,
+		const int out_fd, char **out_buf, ssize_t *out_len);
 int   fwts_pipe_close(const int fd, const pid_t pid);
 int   fwts_pipe_exec(const char *command, fwts_list **list, int *status);
 int   fwts_exec(const char *command, int *status);
diff --git a/src/lib/src/fwts_pipeio.c b/src/lib/src/fwts_pipeio.c
index bb51515..a741477 100644
--- a/src/lib/src/fwts_pipeio.c
+++ b/src/lib/src/fwts_pipeio.c
@@ -32,6 +32,7 @@ 
 #include <paths.h>
 
 #include <sys/param.h>
+#include <sys/poll.h>
 #include <sys/types.h>
 #include <sys/wait.h>
 #include <errno.h>
@@ -128,51 +129,117 @@  int fwts_pipe_open_ro(const char *command, pid_t *childpid, int *fd)
 	return fwts_pipe_open_rw(command, childpid, NULL, fd);
 }
 
+static int fwts_pipeio_set_nonblock(const int fd)
+{
+	int flags;
+	if (fd < 0)
+		return 0;
+	flags = fcntl(fd, F_GETFL, 0);
+	flags |= O_NONBLOCK;
+	return !!fcntl(fd, F_SETFL, flags);
+}
+
 /*
- *  fwts_pipe_read()
- *	read output from fwts_pipe_open_ro(), *out_buf is populated with
- *	returned data (allocated, must be free()-ed after use), and length in
- *	*out_len.
+ *  fwts_pipe_readwrite()
+ *	send input to and read output from fwts_pipe_open_rw(), in_len bytes
+ *	of in_buf are written to the pipe, and data is read into *out_buf,
+ *	*out_len indicating output length. *out_buf is allocated, and
+ *	must be free()-ed after use.
  *	Returns non-zero on failure.
  */
-int fwts_pipe_read(const int fd, char **out_buf, ssize_t *out_len)
+int fwts_pipe_readwrite(
+		const int in_fd, const char *in_buf, const size_t in_len,
+		const int out_fd, char **out_buf, ssize_t *out_len)
 {
+	struct pollfd pollfds[2];
+	size_t in_size = in_len;
+	ssize_t out_size = 0;
 	char *ptr = NULL;
 	char buffer[8192];
-	ssize_t size = 0;
+
 	*out_len = 0;
 
-	ptr = NULL;
+	pollfds[0].fd = out_fd;
+	pollfds[0].events = POLLIN;
+	pollfds[1].fd = in_fd;
+	pollfds[1].events = POLLOUT;
+
+	/* we need non-blocking IO */
+	if (fwts_pipeio_set_nonblock(in_fd))
+		return -1;
+
+	if (fwts_pipeio_set_nonblock(out_fd))
+		return -1;
 
 	for (;;) {
-		ssize_t n = read(fd, buffer, sizeof(buffer));
+		ssize_t n;
 		char *tmp;
+		int rc;
+
+		if (in_size == 0 || in_fd < 0 || in_buf == NULL)
+			pollfds[1].events = 0;
 
-		if (n == 0)
+		rc = poll(pollfds, 2, -1);
+		if (rc < 0)
 			break;
-		if (n < 0) {
-			if (errno != EINTR && errno != EAGAIN) {
+
+		if (pollfds[0].revents) {
+			n = read(out_fd, buffer, sizeof(buffer));
+
+			if (n == 0)
+				break;
+			if (n < 0) {
+				if (errno != EINTR && errno != EAGAIN) {
+					free(ptr);
+					return -1;
+				}
+				continue;
+			}
+
+			if ((tmp = realloc(ptr, out_size + n + 1)) == NULL) {
 				free(ptr);
 				return -1;
 			}
-			continue;
+			ptr = tmp;
+			memcpy(ptr + out_size, buffer, n);
+			out_size += n;
+			*(ptr+out_size) = 0;
 		}
 
-		if ((tmp = realloc(ptr, size + n + 1)) == NULL) {
-			free(ptr);
-			return -1;
+		if (pollfds[1].revents) {
+			n = write(in_fd, in_buf, in_size);
+
+			if (n < 0) {
+				if (errno != EINTR && errno != EAGAIN) {
+					free(ptr);
+					return -1;
+				}
+				continue;
+			}
+
+			in_buf += n;
+			in_size -= n;
 		}
-		ptr = tmp;
-		memcpy(ptr + size, buffer, n);
-		size += n;
-		*(ptr+size) = 0;
+
 	}
-	*out_len = size;
+
+	*out_len = out_size;
 	*out_buf = ptr;
 	return 0;
 }
 
 /*
+ *  fwts_pipe_read()
+ *	read output from fwts_pipe_open_ro(), *length is
+ *	set to the number of chars read and we return
+ *	a buffer of read data.
+ */
+int fwts_pipe_read(const int fd, char **out_buf, ssize_t *out_len)
+{
+	return fwts_pipe_readwrite(-1, NULL, 0, fd, out_buf, out_len);
+}
+
+/*
  *  fwts_pipe_close()
  *	close fd, wait for child of given pid to exit
  */