diff mbox series

[v3] block/nbd: use non-blocking connect: fix vm hang on connect()

Message ID 20200812145237.4396-1-vsementsov@virtuozzo.com
State New
Headers show
Series [v3] block/nbd: use non-blocking connect: fix vm hang on connect() | expand

Commit Message

Vladimir Sementsov-Ogievskiy Aug. 12, 2020, 2:52 p.m. UTC
This make nbd connection_co to yield during reconnects, so that
reconnect doesn't hang up the main thread. This is very important in
case of unavailable nbd server host: connect() call may take a long
time, blocking the main thread (and due to reconnect, it will hang
again and again with small gaps of working time during pauses between
connection attempts).

Realization notes:

 - We don't want to implement non-blocking connect() over non-blocking
 socket, because getaddrinfo() doesn't have portable non-blocking
 realization anyway, so let's just use a thread for both getaddrinfo()
 and connect().

 - We can't use qio_channel_socket_connect_async (which behave
 similarly and start a thread to execute connect() call), as it's rely
 on someone iterating main loop (g_main_loop_run() or something like
 this), which is not always the case.

 - We can't use thread_pool_submit_co API, as thread pool waits for all
 threads to finish (but we don't want to wait for blocking reconnect
 attempt on shutdown.

 So, we just create the thread by hand. Some additional difficulties
 are:

 - We want our connect don't block drained sections and aio context
 switches. To achieve this, we make it possible to "cancel" synchronous
 wait for the connect (which is an coroutine yield actually), still,
 the thread continues in background, and it successful result may be
 reused on next reconnect attempt.

 - We don't want to wait for reconnect on shutdown, so there is
 CONNECT_THREAD_RUNNING_DETACHED thread state, which means that block
 layer not more interested in a result, and thread should close new
 connected socket on finish and free the state.

How to reproduce the bug, fixed with this commit:

1. Create an image on node1:
   qemu-img create -f qcow2 xx 100M

2. Start NBD server on node1:
   qemu-nbd xx

3. Start vm with second nbd disk on node2, like this:

  ./x86_64-softmmu/qemu-system-x86_64 -nodefaults -drive \
    file=/work/images/cent7.qcow2 -drive file=nbd+tcp://192.168.100.2 \
    -vnc :0 -qmp stdio -m 2G -enable-kvm -vga std

4. Access the vm through vnc (or some other way?), and check that NBD
   drive works:

   dd if=/dev/sdb of=/dev/null bs=1M count=10

   - the command should succeed.

5. Now, let's trigger nbd-reconnect loop in Qemu process. For this:

5.1 Kill NBD server on node1

5.2 run "dd if=/dev/sdb of=/dev/null bs=1M count=10" in the guest
    again. The command should fail and a lot of error messages about
    failing disk may appear as well.

    Now NBD client driver in Qemu tries to reconnect.
    Still, VM works well.

6. Make node1 unavailable on NBD port, so connect() from node2 will
   last for a long time:

   On node1 (Note, that 10809 is just a default NBD port):

   sudo iptables -A INPUT -p tcp --dport 10809 -j DROP

   After some time the guest hangs, and you may check in gdb that Qemu
   hangs in connect() call, issued from the main thread. This is the
   BUG.

7. Don't forget to drop iptables rule from your node1:

   sudo iptables -D INPUT -p tcp --dport 10809 -j DROP

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
---

Hi!

This a continuation of "[PATCH v2 for-5.1? 0/5] Fix nbd reconnect dead-locks",
which was mostly merged to 5.1. The only last patch was not merged, and
here is a no-change resend for convenience.


 block/nbd.c | 266 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 265 insertions(+), 1 deletion(-)

Comments

Eric Blake Aug. 19, 2020, 2:46 p.m. UTC | #1
On 8/12/20 9:52 AM, Vladimir Sementsov-Ogievskiy wrote:
> This make nbd connection_co to yield during reconnects, so that

s/make nbd connection_co to/makes nbd's connection_co/

> reconnect doesn't hang up the main thread. This is very important in

s/hang up/block/

> case of unavailable nbd server host: connect() call may take a long

s/of/of an/

> time, blocking the main thread (and due to reconnect, it will hang
> again and again with small gaps of working time during pauses between
> connection attempts).
> 
> Realization notes:
> 
>   - We don't want to implement non-blocking connect() over non-blocking
>   socket, because getaddrinfo() doesn't have portable non-blocking
>   realization anyway, so let's just use a thread for both getaddrinfo()
>   and connect().
> 
>   - We can't use qio_channel_socket_connect_async (which behave

behaves

>   similarly and start a thread to execute connect() call), as it's rely

starts
relying

>   on someone iterating main loop (g_main_loop_run() or something like
>   this), which is not always the case.
> 
>   - We can't use thread_pool_submit_co API, as thread pool waits for all
>   threads to finish (but we don't want to wait for blocking reconnect
>   attempt on shutdown.
> 
>   So, we just create the thread by hand. Some additional difficulties
>   are:
> 
>   - We want our connect don't block drained sections and aio context

s/don't block/to avoid blocking/

>   switches. To achieve this, we make it possible to "cancel" synchronous
>   wait for the connect (which is an coroutine yield actually), still,

s/an/a/

>   the thread continues in background, and it successful result may be

s/it successful/if successful, its/

>   reused on next reconnect attempt.
> 
>   - We don't want to wait for reconnect on shutdown, so there is
>   CONNECT_THREAD_RUNNING_DETACHED thread state, which means that block
>   layer not more interested in a result, and thread should close new

which means that the block layer is no longer interested

>   connected socket on finish and free the state.
> 
> How to reproduce the bug, fixed with this commit:
> 
> 1. Create an image on node1:
>     qemu-img create -f qcow2 xx 100M
> 
> 2. Start NBD server on node1:
>     qemu-nbd xx
> 
> 3. Start vm with second nbd disk on node2, like this:
> 
>    ./x86_64-softmmu/qemu-system-x86_64 -nodefaults -drive \
>      file=/work/images/cent7.qcow2 -drive file=nbd+tcp://192.168.100.2 \
>      -vnc :0 -qmp stdio -m 2G -enable-kvm -vga std
> 
> 4. Access the vm through vnc (or some other way?), and check that NBD
>     drive works:
> 
>     dd if=/dev/sdb of=/dev/null bs=1M count=10
> 
>     - the command should succeed.
> 
> 5. Now, let's trigger nbd-reconnect loop in Qemu process. For this:
> 
> 5.1 Kill NBD server on node1
> 
> 5.2 run "dd if=/dev/sdb of=/dev/null bs=1M count=10" in the guest
>      again. The command should fail and a lot of error messages about
>      failing disk may appear as well.
> 
>      Now NBD client driver in Qemu tries to reconnect.
>      Still, VM works well.
> 
> 6. Make node1 unavailable on NBD port, so connect() from node2 will
>     last for a long time:
> 
>     On node1 (Note, that 10809 is just a default NBD port):
> 
>     sudo iptables -A INPUT -p tcp --dport 10809 -j DROP
> 
>     After some time the guest hangs, and you may check in gdb that Qemu
>     hangs in connect() call, issued from the main thread. This is the
>     BUG.
> 
> 7. Don't forget to drop iptables rule from your node1:
> 
>     sudo iptables -D INPUT -p tcp --dport 10809 -j DROP
> 
> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
> ---
> 
> Hi!
> 
> This a continuation of "[PATCH v2 for-5.1? 0/5] Fix nbd reconnect dead-locks",
> which was mostly merged to 5.1. The only last patch was not merged, and
> here is a no-change resend for convenience.
> 
> 
>   block/nbd.c | 266 +++++++++++++++++++++++++++++++++++++++++++++++++++-
>   1 file changed, 265 insertions(+), 1 deletion(-)

Looks big, but the commit message goes into good detail about what the 
problem is, why the solution takes the approach it does, and a good 
formula for reproduction.

> 
> diff --git a/block/nbd.c b/block/nbd.c
> index 7bb881fef4..919ec5e573 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -38,6 +38,7 @@
>   
>   #include "qapi/qapi-visit-sockets.h"
>   #include "qapi/qmp/qstring.h"
> +#include "qapi/clone-visitor.h"
>   
>   #include "block/qdict.h"
>   #include "block/nbd.h"
> @@ -62,6 +63,47 @@ typedef enum NBDClientState {
>       NBD_CLIENT_QUIT
>   } NBDClientState;
>   
> +typedef enum NBDConnectThreadState {
> +/* No thread, no pending results */
> +    CONNECT_THREAD_NONE,

I'd indent the comments by four spaces, to line up with the enumeration 
values they describe.

> +
> +/* Thread is running, no results for now */
> +    CONNECT_THREAD_RUNNING,
> +
> +/*
> + * Thread is running, but requestor exited. Thread should close the new socket
> + * and free the connect state on exit.
> + */
> +    CONNECT_THREAD_RUNNING_DETACHED,
> +
> +/* Thread finished, results are stored in a state */
> +    CONNECT_THREAD_FAIL,
> +    CONNECT_THREAD_SUCCESS
> +} NBDConnectThreadState;
> +
> +typedef struct NBDConnectThread {
> +    /* Initialization constants */
> +    SocketAddress *saddr; /* address to connect to */
> +    /*
> +     * Bottom half to schedule on completion. Scheduled only if bh_ctx is not
> +     * NULL
> +     */
> +    QEMUBHFunc *bh_func;
> +    void *bh_opaque;
> +
> +    /*
> +     * Result of last attempt. Valid in FAIL and SUCCESS states.
> +     * If you want to steal error, don't forget to set pointer to NULL.
> +     */
> +    QIOChannelSocket *sioc;
> +    Error *err;
> +
> +    /* state and bh_ctx are protected by mutex */
> +    QemuMutex mutex;
> +    NBDConnectThreadState state; /* current state of the thread */
> +    AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */
> +} NBDConnectThread;

Looks reasonable.

> @@ -246,6 +298,216 @@ static bool nbd_client_connecting_wait(BDRVNBDState *s)
>       return s->state == NBD_CLIENT_CONNECTING_WAIT;
>   }
>   
> +static void connect_bh(void *opaque)
> +{
> +    BDRVNBDState *state = opaque;
> +
> +    assert(state->wait_connect);
> +    state->wait_connect = false;
> +    aio_co_wake(state->connection_co);
> +}
> +
> +static void nbd_init_connect_thread(BDRVNBDState *s)
> +{
> +    s->connect_thread = g_new(NBDConnectThread, 1);
> +
> +    *s->connect_thread = (NBDConnectThread) {
> +        .saddr = QAPI_CLONE(SocketAddress, s->saddr),
> +        .state = CONNECT_THREAD_NONE,
> +        .bh_func = connect_bh,
> +        .bh_opaque = s
> +    };

I prefer using trailing commas in initializer lists (less churn if a 
later patch needs to initialize another member)

> +
> +    qemu_mutex_init(&s->connect_thread->mutex);
> +}
> +
> +static void nbd_free_connect_thread(NBDConnectThread *thr)
> +{
> +    if (thr->sioc) {
> +        qio_channel_close(QIO_CHANNEL(thr->sioc), NULL);
> +    }
> +    error_free(thr->err);
> +    qapi_free_SocketAddress(thr->saddr);
> +    g_free(thr);
> +}
> +
> +static void *connect_thread_func(void *opaque)
> +{
> +    NBDConnectThread *thr = opaque;
> +    int ret;
> +    bool do_free = false;
> +
> +    thr->sioc = qio_channel_socket_new();
> +
> +    error_free(thr->err);
> +    thr->err = NULL;

Why do we need to clear the error at startup?  Shouldn't it already be 
created as starting life NULL?

> +    ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err);
> +    if (ret < 0) {
> +        object_unref(OBJECT(thr->sioc));
> +        thr->sioc = NULL;
> +    }
> +
> +    qemu_mutex_lock(&thr->mutex);
> +
> +    switch (thr->state) {
> +    case CONNECT_THREAD_RUNNING:
> +        thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS;
> +        if (thr->bh_ctx) {
> +            aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque);
> +
> +            /* play safe, don't reuse bh_ctx on further connection attempts */
> +            thr->bh_ctx = NULL;
> +        }
> +        break;
> +    case CONNECT_THREAD_RUNNING_DETACHED:
> +        do_free = true;
> +        break;
> +    default:
> +        abort();
> +    }
> +
> +    qemu_mutex_unlock(&thr->mutex);
> +
> +    if (do_free) {
> +        nbd_free_connect_thread(thr);
> +    }
> +
> +    return NULL;
> +}
> +
> +static QIOChannelSocket *coroutine_fn
> +nbd_co_establish_connection(BlockDriverState *bs, Error **errp)

There's some inconsistency on whether you use:

return_type name()

or

return_type
name()

It's aesthetic, but it would be nice to stick to one (I prefer the 
second, but see that the qemu code base as a whole is inconsistent, so 
the best we can do is stay consistent within a file)

> +{
> +    QemuThread thread;
> +    BDRVNBDState *s = bs->opaque;
> +    QIOChannelSocket *res;
> +    NBDConnectThread *thr = s->connect_thread;
> +
> +    qemu_mutex_lock(&thr->mutex);

Should we use the auto-cleanup macro magic here? ...

> +
> +    switch (thr->state) {
> +    case CONNECT_THREAD_FAIL:
> +    case CONNECT_THREAD_NONE:
> +        error_free(thr->err);
> +        thr->err = NULL;
> +        thr->state = CONNECT_THREAD_RUNNING;
> +        qemu_thread_create(&thread, "nbd-connect",
> +                           connect_thread_func, thr, QEMU_THREAD_DETACHED);
> +        break;
> +    case CONNECT_THREAD_SUCCESS:
> +        /* Previous attempt finally succeeded in background */
> +        thr->state = CONNECT_THREAD_NONE;
> +        res = thr->sioc;
> +        thr->sioc = NULL;
> +        qemu_mutex_unlock(&thr->mutex);
> +        return res;
> +    case CONNECT_THREAD_RUNNING:
> +        /* Already running, will wait */
> +        break;
> +    default:
> +        abort();
> +    }
> +
> +    thr->bh_ctx = qemu_get_current_aio_context();
> +
> +    qemu_mutex_unlock(&thr->mutex);
> +
> +

...to do so, you'd put the above in a nested scope, where the mutex is 
released when the scope is exited.

> +    /*
> +     * We are going to wait for connect-thread finish, but
> +     * nbd_client_co_drain_begin() can interrupt.
> +     *
> +     * Note that wait_connect variable is not visible for connect-thread. It
> +     * doesn't need mutex protection, it used only inside home aio context of
> +     * bs.
> +     */
> +    s->wait_connect = true;
> +    qemu_coroutine_yield();
> +
> +    qemu_mutex_lock(&thr->mutex);
> +
> +    switch (thr->state) {
> +    case CONNECT_THREAD_SUCCESS:
> +    case CONNECT_THREAD_FAIL:
> +        thr->state = CONNECT_THREAD_NONE;
> +        error_propagate(errp, thr->err);
> +        thr->err = NULL;
> +        res = thr->sioc;
> +        thr->sioc = NULL;
> +        break;
> +    case CONNECT_THREAD_RUNNING:
> +    case CONNECT_THREAD_RUNNING_DETACHED:
> +        /*
> +         * Obviously, drained section wants to start. Report the attempt as
> +         * failed. Still connect thread is executing in background, and its
> +         * result may be used for next connection attempt.
> +         */
> +        res = NULL;
> +        error_setg(errp, "Connection attempt cancelled by other operation");
> +        break;
> +
> +    case CONNECT_THREAD_NONE:
> +        /*
> +         * Impossible. We've seen this thread running. So it should be
> +         * running or at least give some results.
> +         */
> +        abort();
> +
> +    default:
> +        abort();
> +    }
> +
> +    qemu_mutex_unlock(&thr->mutex);
> +
> +    return res;
> +}

Looks sensible.

> +
> +/*
> + * nbd_co_establish_connection_cancel
> + * Cancel nbd_co_establish_connection asynchronously: it will finish soon, to
> + * allow drained section to begin.
> + *
> + * If detach is true, also cleanup the state (or if thread is running, move it
> + * to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if
> + * detach is true.
> + */
> +static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
> +                                               bool detach)
> +{
> +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;

Is the cast necessary here?

> +    NBDConnectThread *thr = s->connect_thread;
> +    bool wake = false;
> +    bool do_free = false;
> +
> +    qemu_mutex_lock(&thr->mutex);
> +
> +    if (thr->state == CONNECT_THREAD_RUNNING) {
> +        /* We can cancel only in running state, when bh is not yet scheduled */
> +        thr->bh_ctx = NULL;
> +        if (s->wait_connect) {
> +            s->wait_connect = false;
> +            wake = true;
> +        }
> +        if (detach) {
> +            thr->state = CONNECT_THREAD_RUNNING_DETACHED;
> +            s->connect_thread = NULL;
> +        }
> +    } else if (detach) {
> +        do_free = true;
> +    }
> +
> +    qemu_mutex_unlock(&thr->mutex);
> +
> +    if (do_free) {
> +        nbd_free_connect_thread(thr);
> +        s->connect_thread = NULL;
> +    }
> +
> +    if (wake) {
> +        aio_co_wake(s->connection_co);
> +    }
> +}
> +
>   static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>   {
>       int ret;
> @@ -289,7 +551,7 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>           s->ioc = NULL;
>       }
>   
> -    sioc = nbd_establish_connection(s->saddr, &local_err);
> +    sioc = nbd_co_establish_connection(s->bs, &local_err);
>       if (!sioc) {
>           ret = -ECONNREFUSED;
>           goto out;
> @@ -1946,6 +2208,8 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
>       /* successfully connected */
>       s->state = NBD_CLIENT_CONNECTED;
>   
> +    nbd_init_connect_thread(s);
> +
>       s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
>       bdrv_inc_in_flight(bs);
>       aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
> 

Overall this looks good to me.  I still want to run some tests 
(including playing with your reproducer formula), but code-wise, I can 
offer:

Reviewed-by: Eric Blake <eblake@redhat.com>

I pointed out a number of grammar and format touchups; I don't mind 
applying those locally, if there is no other reason to send a v4.
Eric Blake Aug. 19, 2020, 5:52 p.m. UTC | #2
On 8/12/20 9:52 AM, Vladimir Sementsov-Ogievskiy wrote:
> This make nbd connection_co to yield during reconnects, so that
> reconnect doesn't hang up the main thread. This is very important in
> case of unavailable nbd server host: connect() call may take a long
> time, blocking the main thread (and due to reconnect, it will hang
> again and again with small gaps of working time during pauses between
> connection attempts).
> 

> How to reproduce the bug, fixed with this commit:
> 
> 1. Create an image on node1:
>     qemu-img create -f qcow2 xx 100M
> 
> 2. Start NBD server on node1:
>     qemu-nbd xx
> 
> 3. Start vm with second nbd disk on node2, like this:
> 
>    ./x86_64-softmmu/qemu-system-x86_64 -nodefaults -drive \
>      file=/work/images/cent7.qcow2 -drive file=nbd+tcp://192.168.100.2 \
>      -vnc :0 -qmp stdio -m 2G -enable-kvm -vga std

Where is the configuration to set up retry on the nbd connection?  I 
wonder if you have a non-upstream patch that turns it on by default in 
your builds; for upstream, I would have expected something more along 
the lines of -blockdev 
driver=nbd,reconnect-delay=20,server.type=inet,server.data.hostname=192.168.100.2,server.data.port=10809 
(typing off the top of my head, rather than actually tested).

> 
> 4. Access the vm through vnc (or some other way?), and check that NBD
>     drive works:
> 
>     dd if=/dev/sdb of=/dev/null bs=1M count=10
> 
>     - the command should succeed.
> 
> 5. Now, let's trigger nbd-reconnect loop in Qemu process. For this:
> 
> 5.1 Kill NBD server on node1
> 
> 5.2 run "dd if=/dev/sdb of=/dev/null bs=1M count=10" in the guest
>      again. The command should fail and a lot of error messages about
>      failing disk may appear as well.

Why does the guest access fail when the server goes away?  Shouldn't the 
pending guest requests merely be queued for retry (where the guest has 
not seen a failure yet, but may do so if timeouts are reached), rather 
than being instant errors?

> 
>      Now NBD client driver in Qemu tries to reconnect.
>      Still, VM works well.
> 
> 6. Make node1 unavailable on NBD port, so connect() from node2 will
>     last for a long time:
> 
>     On node1 (Note, that 10809 is just a default NBD port):
> 
>     sudo iptables -A INPUT -p tcp --dport 10809 -j DROP
> 
>     After some time the guest hangs, and you may check in gdb that Qemu
>     hangs in connect() call, issued from the main thread. This is the
>     BUG.
> 
> 7. Don't forget to drop iptables rule from your node1:
> 
>     sudo iptables -D INPUT -p tcp --dport 10809 -j DROP
>
Vladimir Sementsov-Ogievskiy Aug. 20, 2020, 10:27 a.m. UTC | #3
19.08.2020 17:46, Eric Blake wrote:
> On 8/12/20 9:52 AM, Vladimir Sementsov-Ogievskiy wrote:
>> This make nbd connection_co to yield during reconnects, so that
> 
> s/make nbd connection_co to/makes nbd's connection_co/
> 
>> reconnect doesn't hang up the main thread. This is very important in
> 
> s/hang up/block/
> 
>> case of unavailable nbd server host: connect() call may take a long
> 
> s/of/of an/
> 
>> time, blocking the main thread (and due to reconnect, it will hang
>> again and again with small gaps of working time during pauses between
>> connection attempts).
>>
>> Realization notes:
>>
>>   - We don't want to implement non-blocking connect() over non-blocking
>>   socket, because getaddrinfo() doesn't have portable non-blocking
>>   realization anyway, so let's just use a thread for both getaddrinfo()
>>   and connect().
>>
>>   - We can't use qio_channel_socket_connect_async (which behave
> 
> behaves
> 
>>   similarly and start a thread to execute connect() call), as it's rely
> 
> starts
> relying
> 
>>   on someone iterating main loop (g_main_loop_run() or something like
>>   this), which is not always the case.
>>
>>   - We can't use thread_pool_submit_co API, as thread pool waits for all
>>   threads to finish (but we don't want to wait for blocking reconnect
>>   attempt on shutdown.
>>
>>   So, we just create the thread by hand. Some additional difficulties
>>   are:
>>
>>   - We want our connect don't block drained sections and aio context
> 
> s/don't block/to avoid blocking/
> 
>>   switches. To achieve this, we make it possible to "cancel" synchronous
>>   wait for the connect (which is an coroutine yield actually), still,
> 
> s/an/a/
> 
>>   the thread continues in background, and it successful result may be
> 
> s/it successful/if successful, its/
> 
>>   reused on next reconnect attempt.
>>
>>   - We don't want to wait for reconnect on shutdown, so there is
>>   CONNECT_THREAD_RUNNING_DETACHED thread state, which means that block
>>   layer not more interested in a result, and thread should close new
> 
> which means that the block layer is no longer interested
> 
>>   connected socket on finish and free the state.
>>
>> How to reproduce the bug, fixed with this commit:
>>
>> 1. Create an image on node1:
>>     qemu-img create -f qcow2 xx 100M
>>
>> 2. Start NBD server on node1:
>>     qemu-nbd xx
>>
>> 3. Start vm with second nbd disk on node2, like this:
>>
>>    ./x86_64-softmmu/qemu-system-x86_64 -nodefaults -drive \
>>      file=/work/images/cent7.qcow2 -drive file=nbd+tcp://192.168.100.2 \
>>      -vnc :0 -qmp stdio -m 2G -enable-kvm -vga std
>>
>> 4. Access the vm through vnc (or some other way?), and check that NBD
>>     drive works:
>>
>>     dd if=/dev/sdb of=/dev/null bs=1M count=10
>>
>>     - the command should succeed.
>>
>> 5. Now, let's trigger nbd-reconnect loop in Qemu process. For this:
>>
>> 5.1 Kill NBD server on node1
>>
>> 5.2 run "dd if=/dev/sdb of=/dev/null bs=1M count=10" in the guest
>>      again. The command should fail and a lot of error messages about
>>      failing disk may appear as well.
>>
>>      Now NBD client driver in Qemu tries to reconnect.
>>      Still, VM works well.
>>
>> 6. Make node1 unavailable on NBD port, so connect() from node2 will
>>     last for a long time:
>>
>>     On node1 (Note, that 10809 is just a default NBD port):
>>
>>     sudo iptables -A INPUT -p tcp --dport 10809 -j DROP
>>
>>     After some time the guest hangs, and you may check in gdb that Qemu
>>     hangs in connect() call, issued from the main thread. This is the
>>     BUG.
>>
>> 7. Don't forget to drop iptables rule from your node1:
>>
>>     sudo iptables -D INPUT -p tcp --dport 10809 -j DROP
>>
>> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
>> ---
>>
>> Hi!
>>
>> This a continuation of "[PATCH v2 for-5.1? 0/5] Fix nbd reconnect dead-locks",
>> which was mostly merged to 5.1. The only last patch was not merged, and
>> here is a no-change resend for convenience.
>>
>>
>>   block/nbd.c | 266 +++++++++++++++++++++++++++++++++++++++++++++++++++-
>>   1 file changed, 265 insertions(+), 1 deletion(-)
> 
> Looks big, but the commit message goes into good detail about what the problem is, why the solution takes the approach it does, and a good formula for reproduction.
> 
>>
>> diff --git a/block/nbd.c b/block/nbd.c
>> index 7bb881fef4..919ec5e573 100644
>> --- a/block/nbd.c
>> +++ b/block/nbd.c
>> @@ -38,6 +38,7 @@
>>   #include "qapi/qapi-visit-sockets.h"
>>   #include "qapi/qmp/qstring.h"
>> +#include "qapi/clone-visitor.h"
>>   #include "block/qdict.h"
>>   #include "block/nbd.h"
>> @@ -62,6 +63,47 @@ typedef enum NBDClientState {
>>       NBD_CLIENT_QUIT
>>   } NBDClientState;
>> +typedef enum NBDConnectThreadState {
>> +/* No thread, no pending results */
>> +    CONNECT_THREAD_NONE,
> 
> I'd indent the comments by four spaces, to line up with the enumeration values they describe.
> 
>> +
>> +/* Thread is running, no results for now */
>> +    CONNECT_THREAD_RUNNING,
>> +
>> +/*
>> + * Thread is running, but requestor exited. Thread should close the new socket
>> + * and free the connect state on exit.
>> + */
>> +    CONNECT_THREAD_RUNNING_DETACHED,
>> +
>> +/* Thread finished, results are stored in a state */
>> +    CONNECT_THREAD_FAIL,
>> +    CONNECT_THREAD_SUCCESS
>> +} NBDConnectThreadState;
>> +
>> +typedef struct NBDConnectThread {
>> +    /* Initialization constants */
>> +    SocketAddress *saddr; /* address to connect to */
>> +    /*
>> +     * Bottom half to schedule on completion. Scheduled only if bh_ctx is not
>> +     * NULL
>> +     */
>> +    QEMUBHFunc *bh_func;
>> +    void *bh_opaque;
>> +
>> +    /*
>> +     * Result of last attempt. Valid in FAIL and SUCCESS states.
>> +     * If you want to steal error, don't forget to set pointer to NULL.
>> +     */
>> +    QIOChannelSocket *sioc;
>> +    Error *err;
>> +
>> +    /* state and bh_ctx are protected by mutex */
>> +    QemuMutex mutex;
>> +    NBDConnectThreadState state; /* current state of the thread */
>> +    AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */
>> +} NBDConnectThread;
> 
> Looks reasonable.
> 
>> @@ -246,6 +298,216 @@ static bool nbd_client_connecting_wait(BDRVNBDState *s)
>>       return s->state == NBD_CLIENT_CONNECTING_WAIT;
>>   }
>> +static void connect_bh(void *opaque)
>> +{
>> +    BDRVNBDState *state = opaque;
>> +
>> +    assert(state->wait_connect);
>> +    state->wait_connect = false;
>> +    aio_co_wake(state->connection_co);
>> +}
>> +
>> +static void nbd_init_connect_thread(BDRVNBDState *s)
>> +{
>> +    s->connect_thread = g_new(NBDConnectThread, 1);
>> +
>> +    *s->connect_thread = (NBDConnectThread) {
>> +        .saddr = QAPI_CLONE(SocketAddress, s->saddr),
>> +        .state = CONNECT_THREAD_NONE,
>> +        .bh_func = connect_bh,
>> +        .bh_opaque = s
>> +    };
> 
> I prefer using trailing commas in initializer lists (less churn if a later patch needs to initialize another member)
> 
>> +
>> +    qemu_mutex_init(&s->connect_thread->mutex);
>> +}
>> +
>> +static void nbd_free_connect_thread(NBDConnectThread *thr)
>> +{
>> +    if (thr->sioc) {
>> +        qio_channel_close(QIO_CHANNEL(thr->sioc), NULL);
>> +    }
>> +    error_free(thr->err);
>> +    qapi_free_SocketAddress(thr->saddr);
>> +    g_free(thr);
>> +}
>> +
>> +static void *connect_thread_func(void *opaque)
>> +{
>> +    NBDConnectThread *thr = opaque;
>> +    int ret;
>> +    bool do_free = false;
>> +
>> +    thr->sioc = qio_channel_socket_new();
>> +
>> +    error_free(thr->err);
>> +    thr->err = NULL;
> 
> Why do we need to clear the error at startup?  Shouldn't it already be created as starting life NULL?

Thread lifetime is one connection attempt. On the next connection attempt it will be restarted, so, it should discard previous error.

> 
>> +    ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err);
>> +    if (ret < 0) {
>> +        object_unref(OBJECT(thr->sioc));
>> +        thr->sioc = NULL;
>> +    }
>> +
>> +    qemu_mutex_lock(&thr->mutex);
>> +
>> +    switch (thr->state) {
>> +    case CONNECT_THREAD_RUNNING:
>> +        thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS;
>> +        if (thr->bh_ctx) {
>> +            aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque);
>> +
>> +            /* play safe, don't reuse bh_ctx on further connection attempts */
>> +            thr->bh_ctx = NULL;
>> +        }
>> +        break;
>> +    case CONNECT_THREAD_RUNNING_DETACHED:
>> +        do_free = true;
>> +        break;
>> +    default:
>> +        abort();
>> +    }
>> +
>> +    qemu_mutex_unlock(&thr->mutex);
>> +
>> +    if (do_free) {
>> +        nbd_free_connect_thread(thr);
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>> +static QIOChannelSocket *coroutine_fn
>> +nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
> 
> There's some inconsistency on whether you use:
> 
> return_type name()
> 
> or
> 
> return_type
> name()
> 
> It's aesthetic, but it would be nice to stick to one (I prefer the second, but see that the qemu code base as a whole is inconsistent, so the best we can do is stay consistent within a file)
> 
>> +{
>> +    QemuThread thread;
>> +    BDRVNBDState *s = bs->opaque;
>> +    QIOChannelSocket *res;
>> +    NBDConnectThread *thr = s->connect_thread;
>> +
>> +    qemu_mutex_lock(&thr->mutex);
> 
> Should we use the auto-cleanup macro magic here? ...
> 
>> +
>> +    switch (thr->state) {
>> +    case CONNECT_THREAD_FAIL:
>> +    case CONNECT_THREAD_NONE:
>> +        error_free(thr->err);
>> +        thr->err = NULL;
>> +        thr->state = CONNECT_THREAD_RUNNING;
>> +        qemu_thread_create(&thread, "nbd-connect",
>> +                           connect_thread_func, thr, QEMU_THREAD_DETACHED);
>> +        break;
>> +    case CONNECT_THREAD_SUCCESS:
>> +        /* Previous attempt finally succeeded in background */
>> +        thr->state = CONNECT_THREAD_NONE;
>> +        res = thr->sioc;
>> +        thr->sioc = NULL;
>> +        qemu_mutex_unlock(&thr->mutex);
>> +        return res;
>> +    case CONNECT_THREAD_RUNNING:
>> +        /* Already running, will wait */
>> +        break;
>> +    default:
>> +        abort();
>> +    }
>> +
>> +    thr->bh_ctx = qemu_get_current_aio_context();
>> +
>> +    qemu_mutex_unlock(&thr->mutex);
>> +
>> +
> 
> ...to do so, you'd put the above in a nested scope, where the mutex is released when the scope is exited.
> 
>> +    /*
>> +     * We are going to wait for connect-thread finish, but
>> +     * nbd_client_co_drain_begin() can interrupt.
>> +     *
>> +     * Note that wait_connect variable is not visible for connect-thread. It
>> +     * doesn't need mutex protection, it used only inside home aio context of
>> +     * bs.
>> +     */
>> +    s->wait_connect = true;
>> +    qemu_coroutine_yield();
>> +
>> +    qemu_mutex_lock(&thr->mutex);
>> +
>> +    switch (thr->state) {
>> +    case CONNECT_THREAD_SUCCESS:
>> +    case CONNECT_THREAD_FAIL:
>> +        thr->state = CONNECT_THREAD_NONE;
>> +        error_propagate(errp, thr->err);
>> +        thr->err = NULL;
>> +        res = thr->sioc;
>> +        thr->sioc = NULL;
>> +        break;
>> +    case CONNECT_THREAD_RUNNING:
>> +    case CONNECT_THREAD_RUNNING_DETACHED:
>> +        /*
>> +         * Obviously, drained section wants to start. Report the attempt as
>> +         * failed. Still connect thread is executing in background, and its
>> +         * result may be used for next connection attempt.
>> +         */
>> +        res = NULL;
>> +        error_setg(errp, "Connection attempt cancelled by other operation");
>> +        break;
>> +
>> +    case CONNECT_THREAD_NONE:
>> +        /*
>> +         * Impossible. We've seen this thread running. So it should be
>> +         * running or at least give some results.
>> +         */
>> +        abort();
>> +
>> +    default:
>> +        abort();
>> +    }
>> +
>> +    qemu_mutex_unlock(&thr->mutex);
>> +
>> +    return res;
>> +}
> 
> Looks sensible.
> 
>> +
>> +/*
>> + * nbd_co_establish_connection_cancel
>> + * Cancel nbd_co_establish_connection asynchronously: it will finish soon, to
>> + * allow drained section to begin.
>> + *
>> + * If detach is true, also cleanup the state (or if thread is running, move it
>> + * to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if
>> + * detach is true.
>> + */
>> +static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
>> +                                               bool detach)
>> +{
>> +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> 
> Is the cast necessary here?

No, better to remove it

> 
>> +    NBDConnectThread *thr = s->connect_thread;
>> +    bool wake = false;
>> +    bool do_free = false;
>> +
>> +    qemu_mutex_lock(&thr->mutex);
>> +
>> +    if (thr->state == CONNECT_THREAD_RUNNING) {
>> +        /* We can cancel only in running state, when bh is not yet scheduled */
>> +        thr->bh_ctx = NULL;
>> +        if (s->wait_connect) {
>> +            s->wait_connect = false;
>> +            wake = true;
>> +        }
>> +        if (detach) {
>> +            thr->state = CONNECT_THREAD_RUNNING_DETACHED;
>> +            s->connect_thread = NULL;
>> +        }
>> +    } else if (detach) {
>> +        do_free = true;
>> +    }
>> +
>> +    qemu_mutex_unlock(&thr->mutex);
>> +
>> +    if (do_free) {
>> +        nbd_free_connect_thread(thr);
>> +        s->connect_thread = NULL;
>> +    }
>> +
>> +    if (wake) {
>> +        aio_co_wake(s->connection_co);
>> +    }
>> +}
>> +
>>   static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>>   {
>>       int ret;
>> @@ -289,7 +551,7 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>>           s->ioc = NULL;
>>       }
>> -    sioc = nbd_establish_connection(s->saddr, &local_err);
>> +    sioc = nbd_co_establish_connection(s->bs, &local_err);
>>       if (!sioc) {
>>           ret = -ECONNREFUSED;
>>           goto out;
>> @@ -1946,6 +2208,8 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
>>       /* successfully connected */
>>       s->state = NBD_CLIENT_CONNECTED;
>> +    nbd_init_connect_thread(s);
>> +
>>       s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
>>       bdrv_inc_in_flight(bs);
>>       aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
>>
> 
> Overall this looks good to me.  I still want to run some tests (including playing with your reproducer formula), but code-wise, I can offer:
> 
> Reviewed-by: Eric Blake <eblake@redhat.com>
> 
> I pointed out a number of grammar and format touchups; I don't mind applying those locally, if there is no other reason to send a v4.
> 

Thanks!
Vladimir Sementsov-Ogievskiy Aug. 20, 2020, 10:31 a.m. UTC | #4
19.08.2020 20:52, Eric Blake wrote:
> On 8/12/20 9:52 AM, Vladimir Sementsov-Ogievskiy wrote:
>> This make nbd connection_co to yield during reconnects, so that
>> reconnect doesn't hang up the main thread. This is very important in
>> case of unavailable nbd server host: connect() call may take a long
>> time, blocking the main thread (and due to reconnect, it will hang
>> again and again with small gaps of working time during pauses between
>> connection attempts).
>>
> 
>> How to reproduce the bug, fixed with this commit:
>>
>> 1. Create an image on node1:
>>     qemu-img create -f qcow2 xx 100M
>>
>> 2. Start NBD server on node1:
>>     qemu-nbd xx
>>
>> 3. Start vm with second nbd disk on node2, like this:
>>
>>    ./x86_64-softmmu/qemu-system-x86_64 -nodefaults -drive \
>>      file=/work/images/cent7.qcow2 -drive file=nbd+tcp://192.168.100.2 \
>>      -vnc :0 -qmp stdio -m 2G -enable-kvm -vga std
> 
> Where is the configuration to set up retry on the nbd connection?  I wonder if you have a non-upstream patch that turns it on by default in your builds; for upstream, I would have expected something more along the lines of -blockdev driver=nbd,reconnect-delay=20,server.type=inet,server.data.hostname=192.168.100.2,server.data.port=10809 (typing off the top of my head, rather than actually tested).

No, it's not necessary: reconnect is enabled always. reconnect-delay just says what to do with guest requests when connection is down. By default, they just fails immediately. But even with reconnect-delay=0 reconnect code works and tries to reestablish the connection.

> 
>>
>> 4. Access the vm through vnc (or some other way?), and check that NBD
>>     drive works:
>>
>>     dd if=/dev/sdb of=/dev/null bs=1M count=10
>>
>>     - the command should succeed.
>>
>> 5. Now, let's trigger nbd-reconnect loop in Qemu process. For this:
>>
>> 5.1 Kill NBD server on node1
>>
>> 5.2 run "dd if=/dev/sdb of=/dev/null bs=1M count=10" in the guest
>>      again. The command should fail and a lot of error messages about
>>      failing disk may appear as well.
> 
> Why does the guest access fail when the server goes away?  Shouldn't the pending guest requests merely be queued for retry (where the guest has not seen a failure yet, but may do so if timeouts are reached), rather than being instant errors?

And that's exactly how it should work when reconnect-delay is 0. If you set reconnect-delay to be >0, then in this period of time after detection of connection failure all the requests will be queued.

> 
>>
>>      Now NBD client driver in Qemu tries to reconnect.
>>      Still, VM works well.
>>
>> 6. Make node1 unavailable on NBD port, so connect() from node2 will
>>     last for a long time:
>>
>>     On node1 (Note, that 10809 is just a default NBD port):
>>
>>     sudo iptables -A INPUT -p tcp --dport 10809 -j DROP
>>
>>     After some time the guest hangs, and you may check in gdb that Qemu
>>     hangs in connect() call, issued from the main thread. This is the
>>     BUG.
>>
>> 7. Don't forget to drop iptables rule from your node1:
>>
>>     sudo iptables -D INPUT -p tcp --dport 10809 -j DROP
>>
>
diff mbox series

Patch

diff --git a/block/nbd.c b/block/nbd.c
index 7bb881fef4..919ec5e573 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -38,6 +38,7 @@ 
 
 #include "qapi/qapi-visit-sockets.h"
 #include "qapi/qmp/qstring.h"
+#include "qapi/clone-visitor.h"
 
 #include "block/qdict.h"
 #include "block/nbd.h"
@@ -62,6 +63,47 @@  typedef enum NBDClientState {
     NBD_CLIENT_QUIT
 } NBDClientState;
 
+typedef enum NBDConnectThreadState {
+/* No thread, no pending results */
+    CONNECT_THREAD_NONE,
+
+/* Thread is running, no results for now */
+    CONNECT_THREAD_RUNNING,
+
+/*
+ * Thread is running, but requestor exited. Thread should close the new socket
+ * and free the connect state on exit.
+ */
+    CONNECT_THREAD_RUNNING_DETACHED,
+
+/* Thread finished, results are stored in a state */
+    CONNECT_THREAD_FAIL,
+    CONNECT_THREAD_SUCCESS
+} NBDConnectThreadState;
+
+typedef struct NBDConnectThread {
+    /* Initialization constants */
+    SocketAddress *saddr; /* address to connect to */
+    /*
+     * Bottom half to schedule on completion. Scheduled only if bh_ctx is not
+     * NULL
+     */
+    QEMUBHFunc *bh_func;
+    void *bh_opaque;
+
+    /*
+     * Result of last attempt. Valid in FAIL and SUCCESS states.
+     * If you want to steal error, don't forget to set pointer to NULL.
+     */
+    QIOChannelSocket *sioc;
+    Error *err;
+
+    /* state and bh_ctx are protected by mutex */
+    QemuMutex mutex;
+    NBDConnectThreadState state; /* current state of the thread */
+    AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */
+} NBDConnectThread;
+
 typedef struct BDRVNBDState {
     QIOChannelSocket *sioc; /* The master data channel */
     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
@@ -91,10 +133,17 @@  typedef struct BDRVNBDState {
     QCryptoTLSCreds *tlscreds;
     const char *hostname;
     char *x_dirty_bitmap;
+
+    bool wait_connect;
+    NBDConnectThread *connect_thread;
 } BDRVNBDState;
 
 static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
                                                   Error **errp);
+static QIOChannelSocket *nbd_co_establish_connection(BlockDriverState *bs,
+                                                     Error **errp);
+static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
+                                               bool detach);
 static int nbd_client_handshake(BlockDriverState *bs, QIOChannelSocket *sioc,
                                 Error **errp);
 
@@ -191,6 +240,8 @@  static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
     if (s->connection_co_sleep_ns_state) {
         qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
     }
+
+    nbd_co_establish_connection_cancel(bs, false);
 }
 
 static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
@@ -223,6 +274,7 @@  static void nbd_teardown_connection(BlockDriverState *bs)
         if (s->connection_co_sleep_ns_state) {
             qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
         }
+        nbd_co_establish_connection_cancel(bs, true);
     }
     if (qemu_in_coroutine()) {
         s->teardown_co = qemu_coroutine_self();
@@ -246,6 +298,216 @@  static bool nbd_client_connecting_wait(BDRVNBDState *s)
     return s->state == NBD_CLIENT_CONNECTING_WAIT;
 }
 
+static void connect_bh(void *opaque)
+{
+    BDRVNBDState *state = opaque;
+
+    assert(state->wait_connect);
+    state->wait_connect = false;
+    aio_co_wake(state->connection_co);
+}
+
+static void nbd_init_connect_thread(BDRVNBDState *s)
+{
+    s->connect_thread = g_new(NBDConnectThread, 1);
+
+    *s->connect_thread = (NBDConnectThread) {
+        .saddr = QAPI_CLONE(SocketAddress, s->saddr),
+        .state = CONNECT_THREAD_NONE,
+        .bh_func = connect_bh,
+        .bh_opaque = s
+    };
+
+    qemu_mutex_init(&s->connect_thread->mutex);
+}
+
+static void nbd_free_connect_thread(NBDConnectThread *thr)
+{
+    if (thr->sioc) {
+        qio_channel_close(QIO_CHANNEL(thr->sioc), NULL);
+    }
+    error_free(thr->err);
+    qapi_free_SocketAddress(thr->saddr);
+    g_free(thr);
+}
+
+static void *connect_thread_func(void *opaque)
+{
+    NBDConnectThread *thr = opaque;
+    int ret;
+    bool do_free = false;
+
+    thr->sioc = qio_channel_socket_new();
+
+    error_free(thr->err);
+    thr->err = NULL;
+    ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err);
+    if (ret < 0) {
+        object_unref(OBJECT(thr->sioc));
+        thr->sioc = NULL;
+    }
+
+    qemu_mutex_lock(&thr->mutex);
+
+    switch (thr->state) {
+    case CONNECT_THREAD_RUNNING:
+        thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS;
+        if (thr->bh_ctx) {
+            aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque);
+
+            /* play safe, don't reuse bh_ctx on further connection attempts */
+            thr->bh_ctx = NULL;
+        }
+        break;
+    case CONNECT_THREAD_RUNNING_DETACHED:
+        do_free = true;
+        break;
+    default:
+        abort();
+    }
+
+    qemu_mutex_unlock(&thr->mutex);
+
+    if (do_free) {
+        nbd_free_connect_thread(thr);
+    }
+
+    return NULL;
+}
+
+static QIOChannelSocket *coroutine_fn
+nbd_co_establish_connection(BlockDriverState *bs, Error **errp)
+{
+    QemuThread thread;
+    BDRVNBDState *s = bs->opaque;
+    QIOChannelSocket *res;
+    NBDConnectThread *thr = s->connect_thread;
+
+    qemu_mutex_lock(&thr->mutex);
+
+    switch (thr->state) {
+    case CONNECT_THREAD_FAIL:
+    case CONNECT_THREAD_NONE:
+        error_free(thr->err);
+        thr->err = NULL;
+        thr->state = CONNECT_THREAD_RUNNING;
+        qemu_thread_create(&thread, "nbd-connect",
+                           connect_thread_func, thr, QEMU_THREAD_DETACHED);
+        break;
+    case CONNECT_THREAD_SUCCESS:
+        /* Previous attempt finally succeeded in background */
+        thr->state = CONNECT_THREAD_NONE;
+        res = thr->sioc;
+        thr->sioc = NULL;
+        qemu_mutex_unlock(&thr->mutex);
+        return res;
+    case CONNECT_THREAD_RUNNING:
+        /* Already running, will wait */
+        break;
+    default:
+        abort();
+    }
+
+    thr->bh_ctx = qemu_get_current_aio_context();
+
+    qemu_mutex_unlock(&thr->mutex);
+
+
+    /*
+     * We are going to wait for connect-thread finish, but
+     * nbd_client_co_drain_begin() can interrupt.
+     *
+     * Note that wait_connect variable is not visible for connect-thread. It
+     * doesn't need mutex protection, it used only inside home aio context of
+     * bs.
+     */
+    s->wait_connect = true;
+    qemu_coroutine_yield();
+
+    qemu_mutex_lock(&thr->mutex);
+
+    switch (thr->state) {
+    case CONNECT_THREAD_SUCCESS:
+    case CONNECT_THREAD_FAIL:
+        thr->state = CONNECT_THREAD_NONE;
+        error_propagate(errp, thr->err);
+        thr->err = NULL;
+        res = thr->sioc;
+        thr->sioc = NULL;
+        break;
+    case CONNECT_THREAD_RUNNING:
+    case CONNECT_THREAD_RUNNING_DETACHED:
+        /*
+         * Obviously, drained section wants to start. Report the attempt as
+         * failed. Still connect thread is executing in background, and its
+         * result may be used for next connection attempt.
+         */
+        res = NULL;
+        error_setg(errp, "Connection attempt cancelled by other operation");
+        break;
+
+    case CONNECT_THREAD_NONE:
+        /*
+         * Impossible. We've seen this thread running. So it should be
+         * running or at least give some results.
+         */
+        abort();
+
+    default:
+        abort();
+    }
+
+    qemu_mutex_unlock(&thr->mutex);
+
+    return res;
+}
+
+/*
+ * nbd_co_establish_connection_cancel
+ * Cancel nbd_co_establish_connection asynchronously: it will finish soon, to
+ * allow drained section to begin.
+ *
+ * If detach is true, also cleanup the state (or if thread is running, move it
+ * to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if
+ * detach is true.
+ */
+static void nbd_co_establish_connection_cancel(BlockDriverState *bs,
+                                               bool detach)
+{
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+    NBDConnectThread *thr = s->connect_thread;
+    bool wake = false;
+    bool do_free = false;
+
+    qemu_mutex_lock(&thr->mutex);
+
+    if (thr->state == CONNECT_THREAD_RUNNING) {
+        /* We can cancel only in running state, when bh is not yet scheduled */
+        thr->bh_ctx = NULL;
+        if (s->wait_connect) {
+            s->wait_connect = false;
+            wake = true;
+        }
+        if (detach) {
+            thr->state = CONNECT_THREAD_RUNNING_DETACHED;
+            s->connect_thread = NULL;
+        }
+    } else if (detach) {
+        do_free = true;
+    }
+
+    qemu_mutex_unlock(&thr->mutex);
+
+    if (do_free) {
+        nbd_free_connect_thread(thr);
+        s->connect_thread = NULL;
+    }
+
+    if (wake) {
+        aio_co_wake(s->connection_co);
+    }
+}
+
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
     int ret;
@@ -289,7 +551,7 @@  static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
         s->ioc = NULL;
     }
 
-    sioc = nbd_establish_connection(s->saddr, &local_err);
+    sioc = nbd_co_establish_connection(s->bs, &local_err);
     if (!sioc) {
         ret = -ECONNREFUSED;
         goto out;
@@ -1946,6 +2208,8 @@  static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     /* successfully connected */
     s->state = NBD_CLIENT_CONNECTED;
 
+    nbd_init_connect_thread(s);
+
     s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
     bdrv_inc_in_flight(bs);
     aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);