From patchwork Wed Oct 5 07:17:34 2011 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Paolo Bonzini X-Patchwork-Id: 117751 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [140.186.70.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id CAED0B6F95 for ; Wed, 5 Oct 2011 18:18:37 +1100 (EST) Received: from localhost ([::1]:45353 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1RBLkJ-0008B7-Au for incoming@patchwork.ozlabs.org; Wed, 05 Oct 2011 03:18:27 -0400 Received: from eggs.gnu.org ([140.186.70.92]:33203) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1RBLjs-0007Qr-EU for qemu-devel@nongnu.org; Wed, 05 Oct 2011 03:18:03 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1RBLjl-0006FM-J8 for qemu-devel@nongnu.org; Wed, 05 Oct 2011 03:18:00 -0400 Received: from mail-wy0-f173.google.com ([74.125.82.173]:35870) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1RBLjl-0006EY-8S for qemu-devel@nongnu.org; Wed, 05 Oct 2011 03:17:53 -0400 Received: by mail-wy0-f173.google.com with SMTP id 22so1507475wyh.4 for ; Wed, 05 Oct 2011 00:17:52 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=sender:from:to:subject:date:message-id:x-mailer:in-reply-to :references; bh=ReFaQ9XBrLdYRjb1wE3q/UmJcsJeTfEvrzw27ZYXKMY=; b=wNEkQQ/DBXzyPED5JMTG/7Hk3Ou8N2xUcw+Fn5BcPFZKanjj4+SPTYY8wAh3f7brFx H30kEJjeiy4S67He3tx2fuYHa8Lr4FQ0z2JzXrbmzQifF2r1a1ePSMi+NpTUG9nVagDj NzhgaudZz9UWCTI2BDS51afAG5dZWMup9nmdQ= Received: by 10.227.2.132 with SMTP id 4mr2689827wbj.25.1317799072853; Wed, 05 Oct 2011 00:17:52 -0700 (PDT) Received: from localhost.localdomain (93-34-218-143.ip51.fastwebnet.it. [93.34.218.143]) by mx.google.com with ESMTPS id p2sm1192306wbo.17.2011.10.05.00.17.51 (version=TLSv1/SSLv3 cipher=OTHER); Wed, 05 Oct 2011 00:17:52 -0700 (PDT) From: Paolo Bonzini To: qemu-devel@nongnu.org Date: Wed, 5 Oct 2011 09:17:34 +0200 Message-Id: <1317799065-29668-5-git-send-email-pbonzini@redhat.com> X-Mailer: git-send-email 1.7.6 In-Reply-To: <1317799065-29668-1-git-send-email-pbonzini@redhat.com> References: <1317799065-29668-1-git-send-email-pbonzini@redhat.com> X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.6 (newer, 2) X-Received-From: 74.125.82.173 Subject: [Qemu-devel] [PATCH v3 04/15] sheepdog: move coroutine send/recv function to generic code X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Outside coroutines, avoid busy waiting on EAGAIN by temporarily making the socket blocking. The API of qemu_recvv/qemu_sendv is slightly different from do_readv/do_writev because they do not handle coroutines. It returns the number of bytes written before encountering an EAGAIN. The specificity of yielding on EAGAIN is entirely in qemu-coroutine.c. Reviewed-by: MORITA Kazutaka Signed-off-by: Paolo Bonzini --- Makefile.objs | 2 +- block/sheepdog.c | 230 +++++---------------------------------------------- cutils.c | 111 +++++++++++++++++++++++++ osdep.c | 4 +- qemu-common.h | 32 +++++++- qemu-coroutine-io.c | 96 +++++++++++++++++++++ 6 files changed, 262 insertions(+), 213 deletions(-) create mode 100644 qemu-coroutine-io.c diff --git a/Makefile.objs b/Makefile.objs index 8d23fbb..55fd634 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -12,7 +12,7 @@ oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o ####################################################################### # coroutines -coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o +coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o ifeq ($(CONFIG_UCONTEXT_COROUTINE),y) coroutine-obj-$(CONFIG_POSIX) += coroutine-ucontext.o else diff --git a/block/sheepdog.c b/block/sheepdog.c index af696a5..d168681 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -443,129 +443,6 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov, return acb; } -#ifdef _WIN32 - -struct msghdr { - struct iovec *msg_iov; - size_t msg_iovlen; -}; - -static ssize_t sendmsg(int s, const struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } - - ret = send(s, buf, size, flags); - - g_free(buf); - return ret; -} - -static ssize_t recvmsg(int s, struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - ret = qemu_recv(s, buf, size, flags); - if (ret < 0) { - goto out; - } - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } -out: - g_free(buf); - return ret; -} - -#endif - -/* - * Send/recv data with iovec buffers - * - * This function send/recv data from/to the iovec buffer directly. - * The first `offset' bytes in the iovec buffer are skipped and next - * `len' bytes are used. - * - * For example, - * - * do_send_recv(sockfd, iov, len, offset, 1); - * - * is equals to - * - * char *buf = malloc(size); - * iov_to_buf(iov, iovcnt, buf, offset, size); - * send(sockfd, buf, size, 0); - * free(buf); - */ -static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset, - int write) -{ - struct msghdr msg; - int ret, diff; - - memset(&msg, 0, sizeof(msg)); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - len += offset; - - while (iov->iov_len < len) { - len -= iov->iov_len; - - iov++; - msg.msg_iovlen++; - } - - diff = iov->iov_len - len; - iov->iov_len -= diff; - - while (msg.msg_iov->iov_len <= offset) { - offset -= msg.msg_iov->iov_len; - - msg.msg_iov++; - msg.msg_iovlen--; - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset; - msg.msg_iov->iov_len -= offset; - - if (write) { - ret = sendmsg(sockfd, &msg, 0); - } else { - ret = recvmsg(sockfd, &msg, 0); - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset; - msg.msg_iov->iov_len += offset; - - iov->iov_len += diff; - return ret; -} - static int connect_to_sdog(const char *addr, const char *port) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; @@ -618,83 +495,19 @@ success: return fd; } -static int do_readv_writev(int sockfd, struct iovec *iov, int len, - int iov_offset, int write) -{ - int ret; -again: - ret = do_send_recv(sockfd, iov, len, iov_offset, write); - if (ret < 0) { - if (errno == EINTR) { - goto again; - } - if (errno == EAGAIN) { - if (qemu_in_coroutine()) { - qemu_coroutine_yield(); - } - goto again; - } - error_report("failed to recv a rsp, %s", strerror(errno)); - return 1; - } - - iov_offset += ret; - len -= ret; - if (len) { - goto again; - } - - return 0; -} - -static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 0); -} - -static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 1); -} - -static int do_read_write(int sockfd, void *buf, int len, int write) -{ - struct iovec iov; - - iov.iov_base = buf; - iov.iov_len = len; - - return do_readv_writev(sockfd, &iov, len, 0, write); -} - -static int do_read(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 0); -} - -static int do_write(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 1); -} - static int send_req(int sockfd, SheepdogReq *hdr, void *data, unsigned int *wlen) { int ret; - struct iovec iov[2]; - iov[0].iov_base = hdr; - iov[0].iov_len = sizeof(*hdr); - - if (*wlen) { - iov[1].iov_base = data; - iov[1].iov_len = *wlen; + ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0); + if (ret < sizeof(*hdr)) { + error_report("failed to send a req, %s", strerror(errno)); } - ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0); - if (ret) { + ret = qemu_send_full(sockfd, data, *wlen, 0); + if (ret < *wlen) { error_report("failed to send a req, %s", strerror(errno)); - ret = -1; } return ret; @@ -705,16 +518,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, { int ret; + socket_set_block(sockfd); ret = send_req(sockfd, hdr, data, wlen); - if (ret) { - ret = -1; + if (ret < 0) { goto out; } - ret = do_read(sockfd, hdr, sizeof(*hdr)); - if (ret) { + ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0); + if (ret < sizeof(*hdr)) { error_report("failed to get a rsp, %s", strerror(errno)); - ret = -1; goto out; } @@ -723,15 +535,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, } if (*rlen) { - ret = do_read(sockfd, data, *rlen); - if (ret) { + ret = qemu_recv_full(sockfd, data, *rlen, 0); + if (ret < *rlen) { error_report("failed to get the data, %s", strerror(errno)); - ret = -1; goto out; } } ret = 0; out: + socket_set_nonblock(sockfd); return ret; } @@ -793,8 +605,8 @@ static void coroutine_fn aio_read_response(void *opaque) } /* read a header */ - ret = do_read(fd, &rsp, sizeof(rsp)); - if (ret) { + ret = qemu_co_recv(fd, &rsp, sizeof(rsp)); + if (ret < 0) { error_report("failed to get the header, %s", strerror(errno)); goto out; } @@ -839,9 +651,9 @@ static void coroutine_fn aio_read_response(void *opaque) } break; case AIOCB_READ_UDATA: - ret = do_readv(fd, acb->qiov->iov, rsp.data_length, - aio_req->iov_offset); - if (ret) { + ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length, + aio_req->iov_offset); + if (ret < 0) { error_report("failed to get the data, %s", strerror(errno)); goto out; } @@ -1114,15 +926,15 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, set_cork(s->fd, 1); /* send a header */ - ret = do_write(s->fd, &hdr, sizeof(hdr)); - if (ret) { + ret = qemu_co_send(s->fd, &hdr, sizeof(hdr)); + if (ret < 0) { error_report("failed to send a req, %s", strerror(errno)); return -EIO; } if (wlen) { - ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset); - if (ret) { + ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset); + if (ret < 0) { error_report("failed to send a data, %s", strerror(errno)); return -EIO; } diff --git a/cutils.c b/cutils.c index c91f887..8ba8cb1 100644 --- a/cutils.c +++ b/cutils.c @@ -25,6 +25,8 @@ #include "host-utils.h" #include +#include "qemu_socket.h" + void pstrcpy(char *buf, int buf_size, const char *str) { int c; @@ -415,3 +417,112 @@ int64_t strtosz(const char *nptr, char **end) { return strtosz_suffix(nptr, end, STRTOSZ_DEFSUFFIX_MB); } + +/* + * Send/recv data with iovec buffers + * + * This function send/recv data from/to the iovec buffer directly. + * The first `offset' bytes in the iovec buffer are skipped and next + * `len' bytes are used. + * + * For example, + * + * do_sendv_recvv(sockfd, iov, len, offset, 1); + * + * is equal to + * + * char *buf = malloc(size); + * iov_to_buf(iov, iovcnt, buf, offset, size); + * send(sockfd, buf, size, 0); + * free(buf); + */ +static int do_sendv_recvv(int sockfd, struct iovec *iov, int len, int offset, + int do_sendv) +{ + int ret, diff, iovlen; + struct iovec *last_iov; + + /* last_iov is inclusive, so count from one. */ + iovlen = 1; + last_iov = iov; + len += offset; + + while (last_iov->iov_len < len) { + len -= last_iov->iov_len; + + last_iov++; + iovlen++; + } + + diff = last_iov->iov_len - len; + last_iov->iov_len -= diff; + + while (iov->iov_len <= offset) { + offset -= iov->iov_len; + + iov++; + iovlen--; + } + + iov->iov_base = (char *) iov->iov_base + offset; + iov->iov_len -= offset; + + { +#ifdef CONFIG_IOVEC + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = iov; + msg.msg_iovlen = iovlen; + + do { + if (do_sendv) { + ret = sendmsg(sockfd, &msg, 0); + } else { + ret = recvmsg(sockfd, &msg, 0); + } + } while (ret == -1 && errno == EINTR); +#else + struct iovec *p = iov; + ret = 0; + while (iovlen > 0) { + int rc; + if (do_sendv) { + rc = send(sockfd, p->iov_base, p->iov_len, 0); + } else { + rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0); + } + if (rc == -1) { + if (errno == EINTR) { + continue; + } + if (ret == 0) { + ret = -1; + } + break; + } + if (rc == 0) { + break; + } + ret += rc; + iovlen--, p++; + } +#endif + } + + /* Undo the changes above */ + iov->iov_base = (char *) iov->iov_base - offset; + iov->iov_len += offset; + last_iov->iov_len += diff; + return ret; +} + +int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset) +{ + return do_sendv_recvv(sockfd, iov, len, iov_offset, 0); +} + +int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset) +{ + return do_sendv_recvv(sockfd, iov, len, iov_offset, 1); +} + diff --git a/osdep.c b/osdep.c index 718a25d..70bad27 100644 --- a/osdep.c +++ b/osdep.c @@ -211,13 +211,13 @@ ssize_t qemu_send_full(int fd, const void *buf, size_t count, int flags) * - return a short write (then name is wrong) * - busy wait adding (errno == EAGAIN) to the loop */ -ssize_t qemu_recv_full(int fd, const void *buf, size_t count, int flags) +ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags) { ssize_t ret = 0; ssize_t total = 0; while (count) { - ret = recv(fd, buf, count, flags); + ret = qemu_recv(fd, buf, count, flags); if (ret <= 0) { if (ret < 0 && errno == EINTR) { continue; diff --git a/qemu-common.h b/qemu-common.h index ed9112a..b1b8053 100644 --- a/qemu-common.h +++ b/qemu-common.h @@ -191,7 +191,7 @@ ssize_t qemu_write_full(int fd, const void *buf, size_t count) QEMU_WARN_UNUSED_RESULT; ssize_t qemu_send_full(int fd, const void *buf, size_t count, int flags) QEMU_WARN_UNUSED_RESULT; -ssize_t qemu_recv_full(int fd, const void *buf, size_t count, int flags) +ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags) QEMU_WARN_UNUSED_RESULT; void qemu_set_cloexec(int fd); @@ -207,6 +207,9 @@ int qemu_pipe(int pipefd[2]); #define qemu_recv(sockfd, buf, len, flags) recv(sockfd, buf, len, flags) #endif +int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset); +int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset); + /* Error handling. */ void QEMU_NORETURN hw_error(const char *fmt, ...) GCC_FMT_ATTR(1, 2); @@ -303,6 +306,33 @@ struct qemu_work_item { void qemu_init_vcpu(void *env); #endif +/** + * Sends an iovec (or optionally a part of it) down a socket, yielding + * when the socket is full. + */ +int qemu_co_sendv(int sockfd, struct iovec *iov, + int len, int iov_offset); + +/** + * Receives data into an iovec (or optionally into a part of it) from + * a socket, yielding when there is no data in the socket. + */ +int qemu_co_recvv(int sockfd, struct iovec *iov, + int len, int iov_offset); + + +/** + * Sends a buffer down a socket, yielding when the socket is full. + */ +int qemu_co_send(int sockfd, void *buf, int len); + +/** + * Receives data into a buffer from a socket, yielding when there + * is no data in the socket. + */ +int qemu_co_recv(int sockfd, void *buf, int len); + + typedef struct QEMUIOVector { struct iovec *iov; int niov; diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c new file mode 100644 index 0000000..40fd514 --- /dev/null +++ b/qemu-coroutine-io.c @@ -0,0 +1,96 @@ +/* + * Coroutine-aware I/O functions + * + * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation. + * Copyright (c) 2011, Red Hat, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#include "qemu-common.h" +#include "qemu_socket.h" +#include "qemu-coroutine.h" + +int coroutine_fn qemu_co_recvv(int sockfd, struct iovec *iov, + int len, int iov_offset) +{ + int total = 0; + int ret; + while (len) { + ret = qemu_recvv(sockfd, iov, len, iov_offset + total); + if (ret < 0) { + if (errno == EAGAIN) { + qemu_coroutine_yield(); + continue; + } + if (total == 0) { + total = -1; + } + break; + } + if (ret == 0) { + break; + } + total += ret, len -= ret; + } + + return total; +} + +int coroutine_fn qemu_co_sendv(int sockfd, struct iovec *iov, + int len, int iov_offset) +{ + int total = 0; + int ret; + while (len) { + ret = qemu_sendv(sockfd, iov, len, iov_offset + total); + if (ret < 0) { + if (errno == EAGAIN) { + qemu_coroutine_yield(); + continue; + } + if (total == 0) { + total = -1; + } + break; + } + total += ret, len -= ret; + } + + return total; +} + +int coroutine_fn qemu_co_recv(int sockfd, void *buf, int len) +{ + struct iovec iov; + + iov.iov_base = buf; + iov.iov_len = len; + + return qemu_co_recvv(sockfd, &iov, len, 0); +} + +int coroutine_fn qemu_co_send(int sockfd, void *buf, int len) +{ + struct iovec iov; + + iov.iov_base = buf; + iov.iov_len = len; + + return qemu_co_sendv(sockfd, &iov, len, 0); +}