diff mbox

[v3,07/10] sheepdog: try to reconnect to sheepdog after network error

Message ID 1374741125-31859-8-git-send-email-morita.kazutaka@lab.ntt.co.jp
State New
Headers show

Commit Message

MORITA Kazutaka July 25, 2013, 8:32 a.m. UTC
This introduces a failed request queue and links all the inflight
requests to the list after network error happens.  After QEMU
reconnects to the sheepdog server successfully, the sheepdog block
driver will retry all the requests in the failed queue.

Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
---
 block/sheepdog.c | 72 ++++++++++++++++++++++++++++++++++++++++++++------------
 1 file changed, 57 insertions(+), 15 deletions(-)

Comments

Liu Yuan July 25, 2013, 9:13 a.m. UTC | #1
On Thu, Jul 25, 2013 at 05:32:02PM +0900, MORITA Kazutaka wrote:
> This introduces a failed request queue and links all the inflight
> requests to the list after network error happens.  After QEMU
> reconnects to the sheepdog server successfully, the sheepdog block
> driver will retry all the requests in the failed queue.
> 
> Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
> ---
>  block/sheepdog.c | 72 ++++++++++++++++++++++++++++++++++++++++++++------------
>  1 file changed, 57 insertions(+), 15 deletions(-)
> 
> diff --git a/block/sheepdog.c b/block/sheepdog.c
> index 7b22816..43a6feb 100644
> --- a/block/sheepdog.c
> +++ b/block/sheepdog.c
> @@ -318,8 +318,11 @@ typedef struct BDRVSheepdogState {
>      Coroutine *co_recv;
>  
>      uint32_t aioreq_seq_num;
> +
> +    /* Every aio request must be linked to either of these queues. */
>      QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
>      QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
> +    QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
>  } BDRVSheepdogState;
>  
>  static const char * sd_strerror(int err)
> @@ -613,6 +616,8 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
>                             enum AIOCBState aiocb_type);
>  static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
>  static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
> +static int get_sheep_fd(BDRVSheepdogState *s);
> +static void co_write_request(void *opaque);
>  
>  static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
>  {
> @@ -654,6 +659,44 @@ static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
>      }
>  }
>  
> +static coroutine_fn void reconnect_to_sdog(void *opaque)
> +{
> +    BDRVSheepdogState *s = opaque;
> +    AIOReq *aio_req, *next;
> +
> +    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
> +    close(s->fd);
> +    s->fd = -1;
> +
> +    /* Wait for outstanding write requests to be completed. */
> +    while (s->co_send != NULL) {
> +        co_write_request(opaque);
> +    }
> +
> +    /* Try to reconnect the sheepdog server every one second. */
> +    while (s->fd < 0) {
> +        s->fd = get_sheep_fd(s);
> +        if (s->fd < 0) {
> +            dprintf("Wait for connection to be established\n");
> +            co_aio_sleep_ns(1000000000ULL);
> +        }
> +    };
> +
> +    /* Move all the inflight requests to the failed queue. */
> +    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
> +        QLIST_REMOVE(aio_req, aio_siblings);
> +        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
> +    }
> +
> +    /* Resend all the failed aio requests. */
> +    while (!QLIST_EMPTY(&s->failed_aio_head)) {
> +        aio_req = QLIST_FIRST(&s->failed_aio_head);
> +        QLIST_REMOVE(aio_req, aio_siblings);
> +        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
> +        resend_aioreq(s, aio_req);
> +    }
> +}
> +

Is failed queue necessary? Here you just move requests from inflight queue to
failed queue, then interate the failed queue to send them all.

Isn't it simpler we just resend the requests in the inflight queue like

> +    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings, next) {
> +        resend_aioreq(s, aio_req);
> +    }

Thanks
Yuan
MORITA Kazutaka July 25, 2013, 12:53 p.m. UTC | #2
At Thu, 25 Jul 2013 17:13:46 +0800,
Liu Yuan wrote:
> 
> > +
> > +    /* Try to reconnect the sheepdog server every one second. */
> > +    while (s->fd < 0) {
> > +        s->fd = get_sheep_fd(s);
> > +        if (s->fd < 0) {
> > +            dprintf("Wait for connection to be established\n");
> > +            co_aio_sleep_ns(1000000000ULL);
> > +        }
> > +    };
> > +
> > +    /* Move all the inflight requests to the failed queue. */
> > +    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
> > +        QLIST_REMOVE(aio_req, aio_siblings);
> > +        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
> > +    }
> > +
> > +    /* Resend all the failed aio requests. */
> > +    while (!QLIST_EMPTY(&s->failed_aio_head)) {
> > +        aio_req = QLIST_FIRST(&s->failed_aio_head);
> > +        QLIST_REMOVE(aio_req, aio_siblings);
> > +        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
> > +        resend_aioreq(s, aio_req);
> > +    }
> > +}
> > +
> 
> Is failed queue necessary? Here you just move requests from inflight queue to
> failed queue, then interate the failed queue to send them all.
> 
> Isn't it simpler we just resend the requests in the inflight queue like
> 
> > +    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings, next) {
> > +        resend_aioreq(s, aio_req);
> > +    }

resend_aioreq() can yield and a new aio request can be added to the
inflight queue during this loop.  To avoid mixing new requests and
failed ones, I think the failed queue is necessary.

Thanks,

Kazutaka
Liu Yuan July 25, 2013, 1:20 p.m. UTC | #3
On Thu, Jul 25, 2013 at 09:53:14PM +0900, MORITA Kazutaka wrote:
> At Thu, 25 Jul 2013 17:13:46 +0800,
> Liu Yuan wrote:
> > 
> > > +
> > > +    /* Try to reconnect the sheepdog server every one second. */
> > > +    while (s->fd < 0) {
> > > +        s->fd = get_sheep_fd(s);
> > > +        if (s->fd < 0) {
> > > +            dprintf("Wait for connection to be established\n");
> > > +            co_aio_sleep_ns(1000000000ULL);
> > > +        }
> > > +    };
> > > +
> > > +    /* Move all the inflight requests to the failed queue. */
> > > +    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
> > > +        QLIST_REMOVE(aio_req, aio_siblings);
> > > +        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
> > > +    }
> > > +
> > > +    /* Resend all the failed aio requests. */
> > > +    while (!QLIST_EMPTY(&s->failed_aio_head)) {
> > > +        aio_req = QLIST_FIRST(&s->failed_aio_head);
> > > +        QLIST_REMOVE(aio_req, aio_siblings);
> > > +        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
> > > +        resend_aioreq(s, aio_req);
> > > +    }
> > > +}
> > > +
> > 
> > Is failed queue necessary? Here you just move requests from inflight queue to
> > failed queue, then interate the failed queue to send them all.
> > 
> > Isn't it simpler we just resend the requests in the inflight queue like
> > 
> > > +    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings, next) {
> > > +        resend_aioreq(s, aio_req);
> > > +    }
> 
> resend_aioreq() can yield and a new aio request can be added to the
> inflight queue during this loop.  To avoid mixing new requests and
> failed ones, I think the failed queue is necessary.
> 

Okay, make sense. This should be included in the source file. You can add my

Tested-and-reviewed-by: Liu Yuan <namei.unix@gmail.com>

to sheepdog patches

Thanks,
Yuan
MORITA Kazutaka July 26, 2013, 5:53 a.m. UTC | #4
At Thu, 25 Jul 2013 21:20:43 +0800,
Liu Yuan wrote:
> 
> On Thu, Jul 25, 2013 at 09:53:14PM +0900, MORITA Kazutaka wrote:
> > At Thu, 25 Jul 2013 17:13:46 +0800,
> > Liu Yuan wrote:
> > > 
> > > > +
> > > > +    /* Try to reconnect the sheepdog server every one second. */
> > > > +    while (s->fd < 0) {
> > > > +        s->fd = get_sheep_fd(s);
> > > > +        if (s->fd < 0) {
> > > > +            dprintf("Wait for connection to be established\n");
> > > > +            co_aio_sleep_ns(1000000000ULL);
> > > > +        }
> > > > +    };
> > > > +
> > > > +    /* Move all the inflight requests to the failed queue. */
> > > > +    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
> > > > +        QLIST_REMOVE(aio_req, aio_siblings);
> > > > +        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
> > > > +    }
> > > > +
> > > > +    /* Resend all the failed aio requests. */
> > > > +    while (!QLIST_EMPTY(&s->failed_aio_head)) {
> > > > +        aio_req = QLIST_FIRST(&s->failed_aio_head);
> > > > +        QLIST_REMOVE(aio_req, aio_siblings);
> > > > +        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
> > > > +        resend_aioreq(s, aio_req);
> > > > +    }
> > > > +}
> > > > +
> > > 
> > > Is failed queue necessary? Here you just move requests from inflight queue to
> > > failed queue, then interate the failed queue to send them all.
> > > 
> > > Isn't it simpler we just resend the requests in the inflight queue like
> > > 
> > > > +    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings, next) {
> > > > +        resend_aioreq(s, aio_req);
> > > > +    }
> > 
> > resend_aioreq() can yield and a new aio request can be added to the
> > inflight queue during this loop.  To avoid mixing new requests and
> > failed ones, I think the failed queue is necessary.
> > 
> 
> Okay, make sense. This should be included in the source file. You can add my

Okay, thanks for your review and comments.

Kazutaka

> 
> Tested-and-reviewed-by: Liu Yuan <namei.unix@gmail.com>
> 
> to sheepdog patches
> 
> Thanks,
> Yuan
diff mbox

Patch

diff --git a/block/sheepdog.c b/block/sheepdog.c
index 7b22816..43a6feb 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -318,8 +318,11 @@  typedef struct BDRVSheepdogState {
     Coroutine *co_recv;
 
     uint32_t aioreq_seq_num;
+
+    /* Every aio request must be linked to either of these queues. */
     QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
     QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
+    QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
 } BDRVSheepdogState;
 
 static const char * sd_strerror(int err)
@@ -613,6 +616,8 @@  static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
                            enum AIOCBState aiocb_type);
 static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
 static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
+static int get_sheep_fd(BDRVSheepdogState *s);
+static void co_write_request(void *opaque);
 
 static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
 {
@@ -654,6 +659,44 @@  static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
     }
 }
 
+static coroutine_fn void reconnect_to_sdog(void *opaque)
+{
+    BDRVSheepdogState *s = opaque;
+    AIOReq *aio_req, *next;
+
+    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
+    close(s->fd);
+    s->fd = -1;
+
+    /* Wait for outstanding write requests to be completed. */
+    while (s->co_send != NULL) {
+        co_write_request(opaque);
+    }
+
+    /* Try to reconnect the sheepdog server every one second. */
+    while (s->fd < 0) {
+        s->fd = get_sheep_fd(s);
+        if (s->fd < 0) {
+            dprintf("Wait for connection to be established\n");
+            co_aio_sleep_ns(1000000000ULL);
+        }
+    };
+
+    /* Move all the inflight requests to the failed queue. */
+    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
+        QLIST_REMOVE(aio_req, aio_siblings);
+        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
+    }
+
+    /* Resend all the failed aio requests. */
+    while (!QLIST_EMPTY(&s->failed_aio_head)) {
+        aio_req = QLIST_FIRST(&s->failed_aio_head);
+        QLIST_REMOVE(aio_req, aio_siblings);
+        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
+        resend_aioreq(s, aio_req);
+    }
+}
+
 /*
  * Receive responses of the I/O requests.
  *
@@ -670,15 +713,11 @@  static void coroutine_fn aio_read_response(void *opaque)
     SheepdogAIOCB *acb;
     uint64_t idx;
 
-    if (QLIST_EMPTY(&s->inflight_aio_head)) {
-        goto out;
-    }
-
     /* read a header */
     ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
     if (ret != sizeof(rsp)) {
         error_report("failed to get the header, %s", strerror(errno));
-        goto out;
+        goto err;
     }
 
     /* find the right aio_req from the inflight aio list */
@@ -689,7 +728,7 @@  static void coroutine_fn aio_read_response(void *opaque)
     }
     if (!aio_req) {
         error_report("cannot find aio_req %x", rsp.id);
-        goto out;
+        goto err;
     }
 
     acb = aio_req->aiocb;
@@ -729,7 +768,7 @@  static void coroutine_fn aio_read_response(void *opaque)
                             aio_req->iov_offset, rsp.data_length);
         if (ret != rsp.data_length) {
             error_report("failed to get the data, %s", strerror(errno));
-            goto out;
+            goto err;
         }
         break;
     case AIOCB_FLUSH_CACHE:
@@ -763,10 +802,9 @@  static void coroutine_fn aio_read_response(void *opaque)
         if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
             ret = reload_inode(s, 0, "");
             if (ret < 0) {
-                goto out;
+                goto err;
             }
         }
-
         if (is_data_obj(aio_req->oid)) {
             aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
                                            data_oid_to_idx(aio_req->oid));
@@ -794,6 +832,10 @@  static void coroutine_fn aio_read_response(void *opaque)
     }
 out:
     s->co_recv = NULL;
+    return;
+err:
+    s->co_recv = NULL;
+    reconnect_to_sdog(opaque);
 }
 
 static void co_read_response(void *opaque)
@@ -819,7 +861,8 @@  static int aio_flush_request(void *opaque)
     BDRVSheepdogState *s = opaque;
 
     return !QLIST_EMPTY(&s->inflight_aio_head) ||
-        !QLIST_EMPTY(&s->pending_aio_head);
+        !QLIST_EMPTY(&s->pending_aio_head) ||
+        !QLIST_EMPTY(&s->failed_aio_head);
 }
 
 /*
@@ -1094,23 +1137,21 @@  static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
     /* send a header */
     ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
     if (ret != sizeof(hdr)) {
-        qemu_co_mutex_unlock(&s->lock);
         error_report("failed to send a req, %s", strerror(errno));
-        return -errno;
+        goto out;
     }
 
     if (wlen) {
         ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
         if (ret != wlen) {
-            qemu_co_mutex_unlock(&s->lock);
             error_report("failed to send a data, %s", strerror(errno));
-            return -errno;
         }
     }
-
+out:
     socket_set_cork(s->fd, 0);
     qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
                             aio_flush_request, s);
+    s->co_send = NULL;
     qemu_co_mutex_unlock(&s->lock);
 
     return 0;
@@ -1300,6 +1341,7 @@  static int sd_open(BlockDriverState *bs, QDict *options, int flags)
 
     QLIST_INIT(&s->inflight_aio_head);
     QLIST_INIT(&s->pending_aio_head);
+    QLIST_INIT(&s->failed_aio_head);
     s->fd = -1;
 
     memset(vdi, 0, sizeof(vdi));