[v8,2/3] block/nbd: nbd reconnect
diff mbox series

Message ID 20190821165215.61406-3-vsementsov@virtuozzo.com
State New
Headers show
Series
  • NBD reconnect
Related show

Commit Message

Vladimir Sementsov-Ogievskiy Aug. 21, 2019, 4:52 p.m. UTC
Implement reconnect. To achieve this:

1. add new modes:
   connecting-wait: means, that reconnecting is in progress, and there
     were small number of reconnect attempts, so all requests are
     waiting for the connection.
   connecting-nowait: reconnecting is in progress, there were a lot of
     attempts of reconnect, all requests will return errors.

   two old modes are used too:
   connected: normal state
   quit: exiting after fatal error or on close

Possible transitions are:

   * -> quit
   connecting-* -> connected
   connecting-wait -> connecting-nowait (transition is done after
                      reconnect-delay seconds in connecting-wait mode)
   connected -> connecting-wait

2. Implement reconnect in connection_co. So, in connecting-* mode,
    connection_co, tries to reconnect unlimited times.

3. Retry nbd queries on channel error, if we are in connecting-wait
    state.

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
---
 block/nbd.c | 335 ++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 271 insertions(+), 64 deletions(-)

Comments

Eric Blake Aug. 21, 2019, 5:35 p.m. UTC | #1
On 8/21/19 11:52 AM, Vladimir Sementsov-Ogievskiy wrote:
> Implement reconnect. To achieve this:
> 
> 1. add new modes:
>    connecting-wait: means, that reconnecting is in progress, and there
>      were small number of reconnect attempts, so all requests are
>      waiting for the connection.
>    connecting-nowait: reconnecting is in progress, there were a lot of
>      attempts of reconnect, all requests will return errors.
> 
>    two old modes are used too:
>    connected: normal state
>    quit: exiting after fatal error or on close
> 
> Possible transitions are:
> 
>    * -> quit
>    connecting-* -> connected
>    connecting-wait -> connecting-nowait (transition is done after
>                       reconnect-delay seconds in connecting-wait mode)
>    connected -> connecting-wait
> 
> 2. Implement reconnect in connection_co. So, in connecting-* mode,
>     connection_co, tries to reconnect unlimited times.
> 
> 3. Retry nbd queries on channel error, if we are in connecting-wait
>     state.
> 
> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
> ---

> +static bool nbd_client_connecting(BDRVNBDState *s)
> +{
> +    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
> +            s->state == NBD_CLIENT_CONNECTING_NOWAIT;


Indentation looks unusual. I might have done:

    return (s->state == NBD_CLIENT_CONNECTING_WAIT ||
            s->state == NBD_CLIENT_CONNECTING_NOWAIT);

Or even exploit the enum encoding:

    return s->state <= NBD_CLIENT_CONNECTING_NOWAIT

Is s->state updated atomically, or do we risk the case where we might
see two different values of s->state across the || sequence point?  Does
that matter?

> +}
> +
> +static bool nbd_client_connecting_wait(BDRVNBDState *s)
> +{
> +    return s->state == NBD_CLIENT_CONNECTING_WAIT;
> +}
> +
> +static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
> +{
> +    Error *local_err = NULL;
> +
> +    if (!nbd_client_connecting(s)) {
> +        return;
> +    }
> +    assert(nbd_client_connecting(s));

This assert adds nothing given the condition beforehand.

> +
> +    /* Wait for completion of all in-flight requests */
> +
> +    qemu_co_mutex_lock(&s->send_mutex);
> +
> +    while (s->in_flight > 0) {
> +        qemu_co_mutex_unlock(&s->send_mutex);
> +        nbd_recv_coroutines_wake_all(s);
> +        s->wait_in_flight = true;
> +        qemu_coroutine_yield();
> +        s->wait_in_flight = false;
> +        qemu_co_mutex_lock(&s->send_mutex);
> +    }
> +
> +    qemu_co_mutex_unlock(&s->send_mutex);
> +
> +    if (!nbd_client_connecting(s)) {
> +        return;
> +    }
> +
> +    /*
> +     * Now we are sure that nobody is accessing the channel, and no one will
> +     * try until we set the state to CONNECTED.
> +     */
> +
> +    /* Finalize previous connection if any */
> +    if (s->ioc) {
> +        nbd_client_detach_aio_context(s->bs);
> +        object_unref(OBJECT(s->sioc));
> +        s->sioc = NULL;
> +        object_unref(OBJECT(s->ioc));
> +        s->ioc = NULL;
> +    }
> +
> +    s->connect_status = nbd_client_connect(s->bs, &local_err);
> +    error_free(s->connect_err);
> +    s->connect_err = NULL;
> +    error_propagate(&s->connect_err, local_err);
> +    local_err = NULL;
> +
> +    if (s->connect_status < 0) {
> +        /* failed attempt */
> +        return;
> +    }
> +
> +    /* successfully connected */
> +    s->state = NBD_CLIENT_CONNECTED;
> +    qemu_co_queue_restart_all(&s->free_sema);
> +}
> +
> +static coroutine_fn void nbd_reconnect_loop(BDRVNBDState *s)
> +{
> +    uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
> +    uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
> +    uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
> +    uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
> +
> +    nbd_reconnect_attempt(s);
> +
> +    while (nbd_client_connecting(s)) {
> +        if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
> +            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
> +        {
> +            s->state = NBD_CLIENT_CONNECTING_NOWAIT;
> +            qemu_co_queue_restart_all(&s->free_sema);
> +        }
> +
> +        qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, timeout,
> +                         &s->connection_co_sleep_ns_state);
> +        if (s->drained) {
> +            bdrv_dec_in_flight(s->bs);
> +            s->wait_drained_end = true;
> +            while (s->drained) {
> +                /*
> +                 * We may be entered once from nbd_client_attach_aio_context_bh
> +                 * and then from nbd_client_co_drain_end. So here is a loop.
> +                 */
> +                qemu_coroutine_yield();
> +            }
> +            bdrv_inc_in_flight(s->bs);
> +        }
> +        if (timeout < max_timeout) {
> +            timeout *= 2;
> +        }
> +
> +        nbd_reconnect_attempt(s);
> +    }
>  }
>  
>  static coroutine_fn void nbd_connection_entry(void *opaque)
>  {
> -    BDRVNBDState *s = opaque;
> +    BDRVNBDState *s = (BDRVNBDState *)opaque;

The cast is not necessary.

>      uint64_t i;
>      int ret = 0;
>      Error *local_err = NULL;
> @@ -177,16 +331,26 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
>           * Therefore we keep an additional in_flight reference all the time and
>           * only drop it temporarily here.
>           */
> +
> +        if (nbd_client_connecting(s)) {
> +            nbd_reconnect_loop(s);
> +        }
> +
> +        if (s->state != NBD_CLIENT_CONNECTED) {
> +            continue;
> +        }
> +
>          assert(s->reply.handle == 0);
>          ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
>  
>          if (local_err) {
>              trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
>              error_free(local_err);
> +            local_err = NULL;
>          }
>          if (ret <= 0) {
>              nbd_channel_error(s, ret ? ret : -EIO);
> -            break;
> +            continue;
>          }
>  
>          /*
> @@ -201,7 +365,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
>              (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
>          {
>              nbd_channel_error(s, -EINVAL);
> -            break;
> +            continue;
>          }
>  

The commit message says you re-attempt the request after reconnection if
you have not yet timed out from the previous connection; but do you also
need to clear out any partial reply received to make sure the new
request isn't operating on stale assumptions left over if the server
died between two structured chunks?


> @@ -927,20 +1113,26 @@ static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
>      } else {
>          assert(request->type != NBD_CMD_WRITE);
>      }
> -    ret = nbd_co_send_request(bs, request, write_qiov);
> -    if (ret < 0) {
> -        return ret;
> -    }
>  
> -    ret = nbd_co_receive_return_code(s, request->handle,
> -                                     &request_ret, &local_err);
> -    if (local_err) {
> -        trace_nbd_co_request_fail(request->from, request->len, request->handle,
> -                                  request->flags, request->type,
> -                                  nbd_cmd_lookup(request->type),
> -                                  ret, error_get_pretty(local_err));
> -        error_free(local_err);
> -    }
> +    do {
> +        ret = nbd_co_send_request(bs, request, write_qiov);
> +        if (ret < 0) {
> +            continue;
> +        }
> +
> +        ret = nbd_co_receive_return_code(s, request->handle,
> +                                         &request_ret, &local_err);
> +        if (local_err) {
> +            trace_nbd_co_request_fail(request->from, request->len,
> +                                      request->handle, request->flags,
> +                                      request->type,
> +                                      nbd_cmd_lookup(request->type),
> +                                      ret, error_get_pretty(local_err));
> +            error_free(local_err);
> +            local_err = NULL;
> +        }
> +    } while (ret < 0 && nbd_client_connecting_wait(s));

I ask because nothing seems to reset request_ret here in the new loop.
Vladimir Sementsov-Ogievskiy Aug. 22, 2019, 11:58 a.m. UTC | #2
21.08.2019 20:35, Eric Blake wrote:
> On 8/21/19 11:52 AM, Vladimir Sementsov-Ogievskiy wrote:
>> Implement reconnect. To achieve this:
>>
>> 1. add new modes:
>>     connecting-wait: means, that reconnecting is in progress, and there
>>       were small number of reconnect attempts, so all requests are
>>       waiting for the connection.
>>     connecting-nowait: reconnecting is in progress, there were a lot of
>>       attempts of reconnect, all requests will return errors.
>>
>>     two old modes are used too:
>>     connected: normal state
>>     quit: exiting after fatal error or on close
>>
>> Possible transitions are:
>>
>>     * -> quit
>>     connecting-* -> connected
>>     connecting-wait -> connecting-nowait (transition is done after
>>                        reconnect-delay seconds in connecting-wait mode)
>>     connected -> connecting-wait
>>
>> 2. Implement reconnect in connection_co. So, in connecting-* mode,
>>      connection_co, tries to reconnect unlimited times.
>>
>> 3. Retry nbd queries on channel error, if we are in connecting-wait
>>      state.
>>
>> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
>> ---
> 
>> +static bool nbd_client_connecting(BDRVNBDState *s)
>> +{
>> +    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
>> +            s->state == NBD_CLIENT_CONNECTING_NOWAIT;
> 
> 
> Indentation looks unusual. I might have done:
> 
>      return (s->state == NBD_CLIENT_CONNECTING_WAIT ||
>              s->state == NBD_CLIENT_CONNECTING_NOWAIT);
> 
> Or even exploit the enum encoding:
> 
>      return s->state <= NBD_CLIENT_CONNECTING_NOWAIT
> 
> Is s->state updated atomically, or do we risk the case where we might
> see two different values of s->state across the || sequence point?  Does
> that matter?

I hope it all happens in one aio context so state change should not intersects with this
function as it doesn't yield.

> 
>> +}
>> +
>> +static bool nbd_client_connecting_wait(BDRVNBDState *s)
>> +{
>> +    return s->state == NBD_CLIENT_CONNECTING_WAIT;
>> +}
>> +
>> +static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>> +{
>> +    Error *local_err = NULL;
>> +
>> +    if (!nbd_client_connecting(s)) {
>> +        return;
>> +    }
>> +    assert(nbd_client_connecting(s));
> 
> This assert adds nothing given the condition beforehand.
> 
>> +
>> +    /* Wait for completion of all in-flight requests */
>> +
>> +    qemu_co_mutex_lock(&s->send_mutex);
>> +
>> +    while (s->in_flight > 0) {
>> +        qemu_co_mutex_unlock(&s->send_mutex);
>> +        nbd_recv_coroutines_wake_all(s);
>> +        s->wait_in_flight = true;
>> +        qemu_coroutine_yield();
>> +        s->wait_in_flight = false;
>> +        qemu_co_mutex_lock(&s->send_mutex);
>> +    }
>> +
>> +    qemu_co_mutex_unlock(&s->send_mutex);
>> +
>> +    if (!nbd_client_connecting(s)) {
>> +        return;
>> +    }
>> +
>> +    /*
>> +     * Now we are sure that nobody is accessing the channel, and no one will
>> +     * try until we set the state to CONNECTED.
>> +     */
>> +
>> +    /* Finalize previous connection if any */
>> +    if (s->ioc) {
>> +        nbd_client_detach_aio_context(s->bs);
>> +        object_unref(OBJECT(s->sioc));
>> +        s->sioc = NULL;
>> +        object_unref(OBJECT(s->ioc));
>> +        s->ioc = NULL;
>> +    }
>> +
>> +    s->connect_status = nbd_client_connect(s->bs, &local_err);
>> +    error_free(s->connect_err);
>> +    s->connect_err = NULL;
>> +    error_propagate(&s->connect_err, local_err);
>> +    local_err = NULL;
>> +
>> +    if (s->connect_status < 0) {
>> +        /* failed attempt */
>> +        return;
>> +    }
>> +
>> +    /* successfully connected */
>> +    s->state = NBD_CLIENT_CONNECTED;
>> +    qemu_co_queue_restart_all(&s->free_sema);
>> +}
>> +
>> +static coroutine_fn void nbd_reconnect_loop(BDRVNBDState *s)
>> +{
>> +    uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
>> +    uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
>> +    uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
>> +    uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
>> +
>> +    nbd_reconnect_attempt(s);
>> +
>> +    while (nbd_client_connecting(s)) {
>> +        if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
>> +            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
>> +        {
>> +            s->state = NBD_CLIENT_CONNECTING_NOWAIT;
>> +            qemu_co_queue_restart_all(&s->free_sema);
>> +        }
>> +
>> +        qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, timeout,
>> +                         &s->connection_co_sleep_ns_state);
>> +        if (s->drained) {
>> +            bdrv_dec_in_flight(s->bs);
>> +            s->wait_drained_end = true;
>> +            while (s->drained) {
>> +                /*
>> +                 * We may be entered once from nbd_client_attach_aio_context_bh
>> +                 * and then from nbd_client_co_drain_end. So here is a loop.
>> +                 */
>> +                qemu_coroutine_yield();
>> +            }
>> +            bdrv_inc_in_flight(s->bs);
>> +        }
>> +        if (timeout < max_timeout) {
>> +            timeout *= 2;
>> +        }
>> +
>> +        nbd_reconnect_attempt(s);
>> +    }
>>   }
>>   
>>   static coroutine_fn void nbd_connection_entry(void *opaque)
>>   {
>> -    BDRVNBDState *s = opaque;
>> +    BDRVNBDState *s = (BDRVNBDState *)opaque;
> 
> The cast is not necessary.
> 
>>       uint64_t i;
>>       int ret = 0;
>>       Error *local_err = NULL;
>> @@ -177,16 +331,26 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
>>            * Therefore we keep an additional in_flight reference all the time and
>>            * only drop it temporarily here.
>>            */
>> +
>> +        if (nbd_client_connecting(s)) {
>> +            nbd_reconnect_loop(s);
>> +        }
>> +
>> +        if (s->state != NBD_CLIENT_CONNECTED) {
>> +            continue;
>> +        }
>> +
>>           assert(s->reply.handle == 0);
>>           ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
>>   
>>           if (local_err) {
>>               trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
>>               error_free(local_err);
>> +            local_err = NULL;
>>           }
>>           if (ret <= 0) {
>>               nbd_channel_error(s, ret ? ret : -EIO);
>> -            break;
>> +            continue;
>>           }
>>   
>>           /*
>> @@ -201,7 +365,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
>>               (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
>>           {
>>               nbd_channel_error(s, -EINVAL);
>> -            break;
>> +            continue;
>>           }
>>   
> 
> The commit message says you re-attempt the request after reconnection if
> you have not yet timed out from the previous connection; but do you also
> need to clear out any partial reply received to make sure the new
> request isn't operating on stale assumptions left over if the server
> died between two structured chunks?


In nbd_reconnect_attempt we "Wait for completion of all in-flight requests", so
all in-flight requests are failed, and no partial progress appears at reconnect point.

> 
> 
>> @@ -927,20 +1113,26 @@ static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
>>       } else {
>>           assert(request->type != NBD_CMD_WRITE);
>>       }
>> -    ret = nbd_co_send_request(bs, request, write_qiov);
>> -    if (ret < 0) {
>> -        return ret;
>> -    }
>>   
>> -    ret = nbd_co_receive_return_code(s, request->handle,
>> -                                     &request_ret, &local_err);
>> -    if (local_err) {
>> -        trace_nbd_co_request_fail(request->from, request->len, request->handle,
>> -                                  request->flags, request->type,
>> -                                  nbd_cmd_lookup(request->type),
>> -                                  ret, error_get_pretty(local_err));
>> -        error_free(local_err);
>> -    }
>> +    do {
>> +        ret = nbd_co_send_request(bs, request, write_qiov);
>> +        if (ret < 0) {
>> +            continue;
>> +        }
>> +
>> +        ret = nbd_co_receive_return_code(s, request->handle,
>> +                                         &request_ret, &local_err);
>> +        if (local_err) {
>> +            trace_nbd_co_request_fail(request->from, request->len,
>> +                                      request->handle, request->flags,
>> +                                      request->type,
>> +                                      nbd_cmd_lookup(request->type),
>> +                                      ret, error_get_pretty(local_err));
>> +            error_free(local_err);
>> +            local_err = NULL;
>> +        }
>> +    } while (ret < 0 && nbd_client_connecting_wait(s));
> 
> I ask because nothing seems to reset request_ret here in the new loop.
> 

We don't need to reset it. It is set only on the last iterations, as if it is set ret
must be 0.

Patch
diff mbox series

diff --git a/block/nbd.c b/block/nbd.c
index beed46fb34..f272154d4b 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -1,6 +1,7 @@ 
 /*
  * QEMU Block driver for  NBD
  *
+ * Copyright (c) 2019 Virtuozzo International GmbH.
  * Copyright (C) 2016 Red Hat, Inc.
  * Copyright (C) 2008 Bull S.A.S.
  *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
@@ -55,6 +56,8 @@  typedef struct {
 } NBDClientRequest;
 
 typedef enum NBDClientState {
+    NBD_CLIENT_CONNECTING_WAIT,
+    NBD_CLIENT_CONNECTING_NOWAIT,
     NBD_CLIENT_CONNECTED,
     NBD_CLIENT_QUIT
 } NBDClientState;
@@ -67,8 +70,14 @@  typedef struct BDRVNBDState {
     CoMutex send_mutex;
     CoQueue free_sema;
     Coroutine *connection_co;
+    void *connection_co_sleep_ns_state;
+    bool drained;
+    bool wait_drained_end;
     int in_flight;
     NBDClientState state;
+    int connect_status;
+    Error *connect_err;
+    bool wait_in_flight;
 
     NBDClientRequest requests[MAX_NBD_REQUESTS];
     NBDReply reply;
@@ -83,10 +92,21 @@  typedef struct BDRVNBDState {
     char *x_dirty_bitmap;
 } BDRVNBDState;
 
-/* @ret will be used for reconnect in future */
+static int nbd_client_connect(BlockDriverState *bs, Error **errp);
+
 static void nbd_channel_error(BDRVNBDState *s, int ret)
 {
-    s->state = NBD_CLIENT_QUIT;
+    if (ret == -EIO) {
+        if (s->state == NBD_CLIENT_CONNECTED) {
+            s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
+                                            NBD_CLIENT_CONNECTING_NOWAIT;
+        }
+    } else {
+        if (s->state == NBD_CLIENT_CONNECTED) {
+            qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+        }
+        s->state = NBD_CLIENT_QUIT;
+    }
 }
 
 static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
@@ -129,7 +149,13 @@  static void nbd_client_attach_aio_context(BlockDriverState *bs,
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
+    /*
+     * s->connection_co is either yielded from nbd_receive_reply or from
+     * nbd_reconnect_loop()
+     */
+    if (s->state == NBD_CLIENT_CONNECTED) {
+        qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
+    }
 
     bdrv_inc_in_flight(bs);
 
@@ -140,29 +166,157 @@  static void nbd_client_attach_aio_context(BlockDriverState *bs,
     aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
 }
 
+static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
+{
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-static void nbd_teardown_connection(BlockDriverState *bs)
+    s->drained = true;
+    if (s->connection_co_sleep_ns_state) {
+        qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
+    }
+}
+
+static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    assert(s->ioc);
+    s->drained = false;
+    if (s->wait_drained_end) {
+        s->wait_drained_end = false;
+        aio_co_wake(s->connection_co);
+    }
+}
+
 
-    /* finish any pending coroutines */
-    qio_channel_shutdown(s->ioc,
-                         QIO_CHANNEL_SHUTDOWN_BOTH,
-                         NULL);
+static void nbd_teardown_connection(BlockDriverState *bs)
+{
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+
+    if (s->state == NBD_CLIENT_CONNECTED) {
+        /* finish any pending coroutines */
+        assert(s->ioc);
+        qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+    }
+    s->state = NBD_CLIENT_QUIT;
+    if (s->connection_co) {
+        if (s->connection_co_sleep_ns_state) {
+            qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
+        }
+    }
     BDRV_POLL_WHILE(bs, s->connection_co);
+}
 
-    nbd_client_detach_aio_context(bs);
-    object_unref(OBJECT(s->sioc));
-    s->sioc = NULL;
-    object_unref(OBJECT(s->ioc));
-    s->ioc = NULL;
+static bool nbd_client_connecting(BDRVNBDState *s)
+{
+    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
+            s->state == NBD_CLIENT_CONNECTING_NOWAIT;
+}
+
+static bool nbd_client_connecting_wait(BDRVNBDState *s)
+{
+    return s->state == NBD_CLIENT_CONNECTING_WAIT;
+}
+
+static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
+{
+    Error *local_err = NULL;
+
+    if (!nbd_client_connecting(s)) {
+        return;
+    }
+    assert(nbd_client_connecting(s));
+
+    /* Wait for completion of all in-flight requests */
+
+    qemu_co_mutex_lock(&s->send_mutex);
+
+    while (s->in_flight > 0) {
+        qemu_co_mutex_unlock(&s->send_mutex);
+        nbd_recv_coroutines_wake_all(s);
+        s->wait_in_flight = true;
+        qemu_coroutine_yield();
+        s->wait_in_flight = false;
+        qemu_co_mutex_lock(&s->send_mutex);
+    }
+
+    qemu_co_mutex_unlock(&s->send_mutex);
+
+    if (!nbd_client_connecting(s)) {
+        return;
+    }
+
+    /*
+     * Now we are sure that nobody is accessing the channel, and no one will
+     * try until we set the state to CONNECTED.
+     */
+
+    /* Finalize previous connection if any */
+    if (s->ioc) {
+        nbd_client_detach_aio_context(s->bs);
+        object_unref(OBJECT(s->sioc));
+        s->sioc = NULL;
+        object_unref(OBJECT(s->ioc));
+        s->ioc = NULL;
+    }
+
+    s->connect_status = nbd_client_connect(s->bs, &local_err);
+    error_free(s->connect_err);
+    s->connect_err = NULL;
+    error_propagate(&s->connect_err, local_err);
+    local_err = NULL;
+
+    if (s->connect_status < 0) {
+        /* failed attempt */
+        return;
+    }
+
+    /* successfully connected */
+    s->state = NBD_CLIENT_CONNECTED;
+    qemu_co_queue_restart_all(&s->free_sema);
+}
+
+static coroutine_fn void nbd_reconnect_loop(BDRVNBDState *s)
+{
+    uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
+    uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
+    uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
+    uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
+
+    nbd_reconnect_attempt(s);
+
+    while (nbd_client_connecting(s)) {
+        if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
+            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
+        {
+            s->state = NBD_CLIENT_CONNECTING_NOWAIT;
+            qemu_co_queue_restart_all(&s->free_sema);
+        }
+
+        qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, timeout,
+                         &s->connection_co_sleep_ns_state);
+        if (s->drained) {
+            bdrv_dec_in_flight(s->bs);
+            s->wait_drained_end = true;
+            while (s->drained) {
+                /*
+                 * We may be entered once from nbd_client_attach_aio_context_bh
+                 * and then from nbd_client_co_drain_end. So here is a loop.
+                 */
+                qemu_coroutine_yield();
+            }
+            bdrv_inc_in_flight(s->bs);
+        }
+        if (timeout < max_timeout) {
+            timeout *= 2;
+        }
+
+        nbd_reconnect_attempt(s);
+    }
 }
 
 static coroutine_fn void nbd_connection_entry(void *opaque)
 {
-    BDRVNBDState *s = opaque;
+    BDRVNBDState *s = (BDRVNBDState *)opaque;
     uint64_t i;
     int ret = 0;
     Error *local_err = NULL;
@@ -177,16 +331,26 @@  static coroutine_fn void nbd_connection_entry(void *opaque)
          * Therefore we keep an additional in_flight reference all the time and
          * only drop it temporarily here.
          */
+
+        if (nbd_client_connecting(s)) {
+            nbd_reconnect_loop(s);
+        }
+
+        if (s->state != NBD_CLIENT_CONNECTED) {
+            continue;
+        }
+
         assert(s->reply.handle == 0);
         ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
 
         if (local_err) {
             trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
             error_free(local_err);
+            local_err = NULL;
         }
         if (ret <= 0) {
             nbd_channel_error(s, ret ? ret : -EIO);
-            break;
+            continue;
         }
 
         /*
@@ -201,7 +365,7 @@  static coroutine_fn void nbd_connection_entry(void *opaque)
             (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
         {
             nbd_channel_error(s, -EINVAL);
-            break;
+            continue;
         }
 
         /*
@@ -220,10 +384,19 @@  static coroutine_fn void nbd_connection_entry(void *opaque)
         qemu_coroutine_yield();
     }
 
+    qemu_co_queue_restart_all(&s->free_sema);
     nbd_recv_coroutines_wake_all(s);
     bdrv_dec_in_flight(s->bs);
 
     s->connection_co = NULL;
+    if (s->ioc) {
+        nbd_client_detach_aio_context(s->bs);
+        object_unref(OBJECT(s->sioc));
+        s->sioc = NULL;
+        object_unref(OBJECT(s->ioc));
+        s->ioc = NULL;
+    }
+
     aio_wait_kick();
 }
 
@@ -235,7 +408,7 @@  static int nbd_co_send_request(BlockDriverState *bs,
     int rc, i = -1;
 
     qemu_co_mutex_lock(&s->send_mutex);
-    while (s->in_flight == MAX_NBD_REQUESTS) {
+    while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
         qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
     }
 
@@ -286,7 +459,11 @@  err:
             s->requests[i].coroutine = NULL;
             s->in_flight--;
         }
-        qemu_co_queue_next(&s->free_sema);
+        if (s->in_flight == 0 && s->wait_in_flight) {
+            aio_co_wake(s->connection_co);
+        } else {
+            qemu_co_queue_next(&s->free_sema);
+        }
     }
     qemu_co_mutex_unlock(&s->send_mutex);
     return rc;
@@ -666,10 +843,15 @@  static coroutine_fn int nbd_co_receive_one_chunk(
     } else {
         /* For assert at loop start in nbd_connection_entry */
         *reply = s->reply;
-        s->reply.handle = 0;
     }
+    s->reply.handle = 0;
 
-    if (s->connection_co) {
+    if (s->connection_co && !s->wait_in_flight) {
+        /*
+         * We must check s->wait_in_flight, because we may entered by
+         * nbd_recv_coroutines_wake_all(), in this case we should not
+         * wake connection_co here, it will woken by last request.
+         */
         aio_co_wake(s->connection_co);
     }
 
@@ -781,7 +963,11 @@  break_loop:
 
     qemu_co_mutex_lock(&s->send_mutex);
     s->in_flight--;
-    qemu_co_queue_next(&s->free_sema);
+    if (s->in_flight == 0 && s->wait_in_flight) {
+        aio_co_wake(s->connection_co);
+    } else {
+        qemu_co_queue_next(&s->free_sema);
+    }
     qemu_co_mutex_unlock(&s->send_mutex);
 
     return false;
@@ -927,20 +1113,26 @@  static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
     } else {
         assert(request->type != NBD_CMD_WRITE);
     }
-    ret = nbd_co_send_request(bs, request, write_qiov);
-    if (ret < 0) {
-        return ret;
-    }
 
-    ret = nbd_co_receive_return_code(s, request->handle,
-                                     &request_ret, &local_err);
-    if (local_err) {
-        trace_nbd_co_request_fail(request->from, request->len, request->handle,
-                                  request->flags, request->type,
-                                  nbd_cmd_lookup(request->type),
-                                  ret, error_get_pretty(local_err));
-        error_free(local_err);
-    }
+    do {
+        ret = nbd_co_send_request(bs, request, write_qiov);
+        if (ret < 0) {
+            continue;
+        }
+
+        ret = nbd_co_receive_return_code(s, request->handle,
+                                         &request_ret, &local_err);
+        if (local_err) {
+            trace_nbd_co_request_fail(request->from, request->len,
+                                      request->handle, request->flags,
+                                      request->type,
+                                      nbd_cmd_lookup(request->type),
+                                      ret, error_get_pretty(local_err));
+            error_free(local_err);
+            local_err = NULL;
+        }
+    } while (ret < 0 && nbd_client_connecting_wait(s));
+
     return ret ? ret : request_ret;
 }
 
@@ -981,20 +1173,24 @@  static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
         request.len -= slop;
     }
 
-    ret = nbd_co_send_request(bs, &request, NULL);
-    if (ret < 0) {
-        return ret;
-    }
+    do {
+        ret = nbd_co_send_request(bs, &request, NULL);
+        if (ret < 0) {
+            continue;
+        }
+
+        ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
+                                           &request_ret, &local_err);
+        if (local_err) {
+            trace_nbd_co_request_fail(request.from, request.len, request.handle,
+                                      request.flags, request.type,
+                                      nbd_cmd_lookup(request.type),
+                                      ret, error_get_pretty(local_err));
+            error_free(local_err);
+            local_err = NULL;
+        }
+    } while (ret < 0 && nbd_client_connecting_wait(s));
 
-    ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
-                                       &request_ret, &local_err);
-    if (local_err) {
-        trace_nbd_co_request_fail(request.from, request.len, request.handle,
-                                  request.flags, request.type,
-                                  nbd_cmd_lookup(request.type),
-                                  ret, error_get_pretty(local_err));
-        error_free(local_err);
-    }
     return ret ? ret : request_ret;
 }
 
@@ -1127,20 +1323,25 @@  static int coroutine_fn nbd_client_co_block_status(
     if (s->info.min_block) {
         assert(QEMU_IS_ALIGNED(request.len, s->info.min_block));
     }
-    ret = nbd_co_send_request(bs, &request, NULL);
-    if (ret < 0) {
-        return ret;
-    }
+    do {
+        ret = nbd_co_send_request(bs, &request, NULL);
+        if (ret < 0) {
+            continue;
+        }
+
+        ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
+                                               &extent, &request_ret,
+                                               &local_err);
+        if (local_err) {
+            trace_nbd_co_request_fail(request.from, request.len, request.handle,
+                                      request.flags, request.type,
+                                      nbd_cmd_lookup(request.type),
+                                      ret, error_get_pretty(local_err));
+            error_free(local_err);
+            local_err = NULL;
+        }
+    } while (ret < 0 && nbd_client_connecting_wait(s));
 
-    ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
-                                           &extent, &request_ret, &local_err);
-    if (local_err) {
-        trace_nbd_co_request_fail(request.from, request.len, request.handle,
-                                  request.flags, request.type,
-                                  nbd_cmd_lookup(request.type),
-                                  ret, error_get_pretty(local_err));
-        error_free(local_err);
-    }
     if (ret < 0 || request_ret < 0) {
         return ret ? ret : request_ret;
     }
@@ -1159,9 +1360,9 @@  static void nbd_client_close(BlockDriverState *bs)
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     NBDRequest request = { .type = NBD_CMD_DISC };
 
-    assert(s->ioc);
-
-    nbd_send_request(s->ioc, &request);
+    if (s->ioc) {
+        nbd_send_request(s->ioc, &request);
+    }
 
     nbd_teardown_connection(bs);
 }
@@ -1804,6 +2005,8 @@  static BlockDriver bdrv_nbd = {
     .bdrv_getlength             = nbd_getlength,
     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
+    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
+    .bdrv_co_drain_end          = nbd_client_co_drain_end,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
@@ -1826,6 +2029,8 @@  static BlockDriver bdrv_nbd_tcp = {
     .bdrv_getlength             = nbd_getlength,
     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
+    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
+    .bdrv_co_drain_end          = nbd_client_co_drain_end,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
@@ -1848,6 +2053,8 @@  static BlockDriver bdrv_nbd_unix = {
     .bdrv_getlength             = nbd_getlength,
     .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
     .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
+    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
+    .bdrv_co_drain_end          = nbd_client_co_drain_end,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,