diff mbox series

[5/5] migration/multifd: Add a synchronization point for channel creation

Message ID 20240202191128.1901-6-farosas@suse.de
State New
Headers show
Series migration/multifd: Fix channel creation vs. cleanup races | expand

Commit Message

Fabiano Rosas Feb. 2, 2024, 7:11 p.m. UTC
It is possible that one of the multifd channels fails to be created at
multifd_new_send_channel_async() while the rest of the channel
creation tasks are still in flight.

This could lead to multifd_save_cleanup() executing the
qemu_thread_join() loop too early and not waiting for the threads
which haven't been created yet, leading to the freeing of resources
that the newly created threads will try to access and crash.

Add a synchronization point after which there will be no attempts at
thread creation and therefore calling multifd_save_cleanup() past that
point will ensure it properly waits for the threads.

A note about performance: Prior to this patch, if a channel took too
long to be established, other channels could finish connecting first
and already start taking load. Now we're bounded by the
slowest-connecting channel.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
 migration/multifd.c | 67 +++++++++++++++++++++++++--------------------
 1 file changed, 37 insertions(+), 30 deletions(-)

Comments

Peter Xu Feb. 5, 2024, 6:20 a.m. UTC | #1
On Fri, Feb 02, 2024 at 04:11:28PM -0300, Fabiano Rosas wrote:
> It is possible that one of the multifd channels fails to be created at
> multifd_new_send_channel_async() while the rest of the channel
> creation tasks are still in flight.
> 
> This could lead to multifd_save_cleanup() executing the
> qemu_thread_join() loop too early and not waiting for the threads
> which haven't been created yet, leading to the freeing of resources
> that the newly created threads will try to access and crash.
> 
> Add a synchronization point after which there will be no attempts at
> thread creation and therefore calling multifd_save_cleanup() past that
> point will ensure it properly waits for the threads.
> 
> A note about performance: Prior to this patch, if a channel took too
> long to be established, other channels could finish connecting first
> and already start taking load. Now we're bounded by the
> slowest-connecting channel.

Yes, I think this should (hopefully!) be fine.

> 
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
>  migration/multifd.c | 67 +++++++++++++++++++++++++--------------------
>  1 file changed, 37 insertions(+), 30 deletions(-)
> 
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 1851206352..888ac8b05d 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -360,6 +360,11 @@ struct {
>      MultiFDPages_t *pages;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> +    /*
> +     * Synchronization point past which no more channels will be
> +     * created.
> +     */
> +    QemuSemaphore channels_created;
>      /* send channels ready */
>      QemuSemaphore channels_ready;
>      /*
> @@ -561,6 +566,7 @@ void multifd_save_cleanup(void)
>              error_free(local_err);
>          }
>      }
> +    qemu_sem_destroy(&multifd_send_state->channels_created);
>      qemu_sem_destroy(&multifd_send_state->channels_ready);
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
> @@ -787,13 +793,6 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
>      trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
>  
>      migrate_set_error(migrate_get_current(), err);
> -    /*
> -     * Error happen, mark multifd_send_thread status as 'quit' although it
> -     * is not created, and then tell who pay attention to me.
> -     */
> -    p->quit = true;
> -    qemu_sem_post(&multifd_send_state->channels_ready);
> -    qemu_sem_post(&p->sem_sync);
>      error_free(err);
>  }
>  
> @@ -862,39 +861,37 @@ static bool multifd_channel_connect(MultiFDSendParams *p,
>      return true;
>  }
>  
> -static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
> -                                             QIOChannel *ioc, Error *err)
> -{
> -     migrate_set_error(migrate_get_current(), err);
> -     /* Error happen, we need to tell who pay attention to me */
> -     qemu_sem_post(&multifd_send_state->channels_ready);
> -     qemu_sem_post(&p->sem_sync);
> -     /*
> -      * Although multifd_send_thread is not created, but main migration
> -      * thread need to judge whether it is running, so we need to mark
> -      * its status.
> -      */
> -     p->quit = true;
> -     object_unref(OBJECT(ioc));
> -     error_free(err);
> -}
> -
>  static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
>  {
>      MultiFDSendParams *p = opaque;
>      QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
>      Error *local_err = NULL;
> +    bool ret;
>  
>      trace_multifd_new_send_channel_async(p->id);
> -    if (!qio_task_propagate_error(task, &local_err)) {
> -        qio_channel_set_delay(ioc, false);
> -        if (multifd_channel_connect(p, ioc, &local_err)) {
> -            return;
> -        }
> +
> +    if (qio_task_propagate_error(task, &local_err)) {
> +        ret = false;
> +        goto out;
> +    }
> +
> +    qio_channel_set_delay(ioc, false);
> +    ret = multifd_channel_connect(p, ioc, &local_err);
> +
> +out:
> +    /*
> +     * Here we're not interested whether creation succeeded, only that
> +     * it happened at all.
> +     */
> +    qemu_sem_post(&multifd_send_state->channels_created);
> +    if (ret) {
> +        return;
>      }
>  
>      trace_multifd_new_send_channel_async_error(p->id, local_err);
> -    multifd_new_send_channel_cleanup(p, ioc, local_err);
> +    migrate_set_error(migrate_get_current(), local_err);
> +    object_unref(OBJECT(ioc));
> +    error_free(local_err);
>  }
>  
>  static void multifd_new_send_channel_create(gpointer opaque)
> @@ -918,6 +915,7 @@ bool multifd_save_setup(void)
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>      multifd_send_state->pages = multifd_pages_init(page_count);
> +    qemu_sem_init(&multifd_send_state->channels_created, 0);
>      qemu_sem_init(&multifd_send_state->channels_ready, 0);
>      qatomic_set(&multifd_send_state->exiting, 0);
>      multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
> @@ -953,6 +951,15 @@ bool multifd_save_setup(void)
>          multifd_new_send_channel_create(p);
>      }
>  
> +    /*
> +     * Wait until channel creation has started for all channels. The
> +     * creation can still fail, but no more channels will be created
> +     * past this point.
> +     */

Let me double check with you here on the TLS use case.

IIUC we still can have more channels to be created if TLS is enabled: we
notify the sem as long as the handshake thread is created, then the
handshake thread can further create the tls-armed iochannel?  However I
think I get your point, and that is fine, because if that is the case, even
though this loop can complete before tls further creates the final channel,
we'll still see tls_thread_created==true and join() that tls thread first,
then further we'll join() the next multifd thread even if a new one will
pop up, or if it failed then nothing to join besides the tls thread.

I'm not sure whether Avihai has any input, I think this can be a good idea
indeed.  there's a dependency chain on the ordering if my above
undertanding is correct; we may want to document this somewhere, perhaps
right here on the chaining of threads and how we handle that?

This may not allow a concurrent migrate_cancel to respond, but I assume
this is good enough; the migrate_cancel request is indeed at least so far
something I made up, but not a request from anyone.  We can leave that for
later and fix the race / crash first.  This seems to be a complete fix from
that regard.

> +    for (i = 0; i < thread_count; i++) {
> +        qemu_sem_wait(&multifd_send_state->channels_created);
> +    }
> +
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> -- 
> 2.35.3
> 

One other note is I think this will also deserve a cc: stable? But then
it'll mean all patch 3/4 will also need to copy stable to make Michael's
life easier.

Let's also copy Dan when repost; after all he more or less owns the TLS
part.

Thanks!
Avihai Horon Feb. 5, 2024, 11:10 a.m. UTC | #2
On 05/02/2024 8:20, Peter Xu wrote:
> External email: Use caution opening links or attachments
>
>
> On Fri, Feb 02, 2024 at 04:11:28PM -0300, Fabiano Rosas wrote:
>> It is possible that one of the multifd channels fails to be created at
>> multifd_new_send_channel_async() while the rest of the channel
>> creation tasks are still in flight.
>>
>> This could lead to multifd_save_cleanup() executing the
>> qemu_thread_join() loop too early and not waiting for the threads
>> which haven't been created yet, leading to the freeing of resources
>> that the newly created threads will try to access and crash.
>>
>> Add a synchronization point after which there will be no attempts at
>> thread creation and therefore calling multifd_save_cleanup() past that
>> point will ensure it properly waits for the threads.
>>
>> A note about performance: Prior to this patch, if a channel took too
>> long to be established, other channels could finish connecting first
>> and already start taking load. Now we're bounded by the
>> slowest-connecting channel.
> Yes, I think this should (hopefully!) be fine.
>
>> Signed-off-by: Fabiano Rosas <farosas@suse.de>
>> ---
>>   migration/multifd.c | 67 +++++++++++++++++++++++++--------------------
>>   1 file changed, 37 insertions(+), 30 deletions(-)
>>
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index 1851206352..888ac8b05d 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
>> @@ -360,6 +360,11 @@ struct {
>>       MultiFDPages_t *pages;
>>       /* global number of generated multifd packets */
>>       uint64_t packet_num;
>> +    /*
>> +     * Synchronization point past which no more channels will be
>> +     * created.
>> +     */
>> +    QemuSemaphore channels_created;
>>       /* send channels ready */
>>       QemuSemaphore channels_ready;
>>       /*
>> @@ -561,6 +566,7 @@ void multifd_save_cleanup(void)
>>               error_free(local_err);
>>           }
>>       }
>> +    qemu_sem_destroy(&multifd_send_state->channels_created);
>>       qemu_sem_destroy(&multifd_send_state->channels_ready);
>>       g_free(multifd_send_state->params);
>>       multifd_send_state->params = NULL;
>> @@ -787,13 +793,6 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
>>       trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
>>
>>       migrate_set_error(migrate_get_current(), err);
>> -    /*
>> -     * Error happen, mark multifd_send_thread status as 'quit' although it
>> -     * is not created, and then tell who pay attention to me.
>> -     */
>> -    p->quit = true;
>> -    qemu_sem_post(&multifd_send_state->channels_ready);
>> -    qemu_sem_post(&p->sem_sync);
>>       error_free(err);
>>   }
>>
>> @@ -862,39 +861,37 @@ static bool multifd_channel_connect(MultiFDSendParams *p,
>>       return true;
>>   }
>>
>> -static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
>> -                                             QIOChannel *ioc, Error *err)
>> -{
>> -     migrate_set_error(migrate_get_current(), err);
>> -     /* Error happen, we need to tell who pay attention to me */
>> -     qemu_sem_post(&multifd_send_state->channels_ready);
>> -     qemu_sem_post(&p->sem_sync);
>> -     /*
>> -      * Although multifd_send_thread is not created, but main migration
>> -      * thread need to judge whether it is running, so we need to mark
>> -      * its status.
>> -      */
>> -     p->quit = true;
>> -     object_unref(OBJECT(ioc));
>> -     error_free(err);
>> -}
>> -
>>   static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
>>   {
>>       MultiFDSendParams *p = opaque;
>>       QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
>>       Error *local_err = NULL;
>> +    bool ret;
>>
>>       trace_multifd_new_send_channel_async(p->id);
>> -    if (!qio_task_propagate_error(task, &local_err)) {
>> -        qio_channel_set_delay(ioc, false);
>> -        if (multifd_channel_connect(p, ioc, &local_err)) {
>> -            return;
>> -        }
>> +
>> +    if (qio_task_propagate_error(task, &local_err)) {
>> +        ret = false;
>> +        goto out;
>> +    }
>> +
>> +    qio_channel_set_delay(ioc, false);
>> +    ret = multifd_channel_connect(p, ioc, &local_err);
>> +
>> +out:
>> +    /*
>> +     * Here we're not interested whether creation succeeded, only that
>> +     * it happened at all.
>> +     */
>> +    qemu_sem_post(&multifd_send_state->channels_created);
>> +    if (ret) {
>> +        return;
>>       }
>>
>>       trace_multifd_new_send_channel_async_error(p->id, local_err);
>> -    multifd_new_send_channel_cleanup(p, ioc, local_err);
>> +    migrate_set_error(migrate_get_current(), local_err);
>> +    object_unref(OBJECT(ioc));
>> +    error_free(local_err);
>>   }
>>
>>   static void multifd_new_send_channel_create(gpointer opaque)
>> @@ -918,6 +915,7 @@ bool multifd_save_setup(void)
>>       multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>>       multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>>       multifd_send_state->pages = multifd_pages_init(page_count);
>> +    qemu_sem_init(&multifd_send_state->channels_created, 0);
>>       qemu_sem_init(&multifd_send_state->channels_ready, 0);
>>       qatomic_set(&multifd_send_state->exiting, 0);
>>       multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
>> @@ -953,6 +951,15 @@ bool multifd_save_setup(void)
>>           multifd_new_send_channel_create(p);
>>       }
>>
>> +    /*
>> +     * Wait until channel creation has started for all channels. The
>> +     * creation can still fail, but no more channels will be created
>> +     * past this point.
>> +     */
> Let me double check with you here on the TLS use case.
>
> IIUC we still can have more channels to be created if TLS is enabled: we
> notify the sem as long as the handshake thread is created, then the
> handshake thread can further create the tls-armed iochannel?  However I
> think I get your point, and that is fine, because if that is the case, even
> though this loop can complete before tls further creates the final channel,
> we'll still see tls_thread_created==true and join() that tls thread first,
> then further we'll join() the next multifd thread even if a new one will
> pop up, or if it failed then nothing to join besides the tls thread.
>
> I'm not sure whether Avihai has any input, I think this can be a good idea
> indeed.

Nothing special, my understanding of this is the same as yours.
This fix looks solid.

>    there's a dependency chain on the ordering if my above
> undertanding is correct; we may want to document this somewhere, perhaps
> right here on the chaining of threads and how we handle that?

I agree, this is subtle and may deserve a small note or hint.

>
> This may not allow a concurrent migrate_cancel to respond, but I assume
> this is good enough; the migrate_cancel request is indeed at least so far
> something I made up, but not a request from anyone.  We can leave that for
> later and fix the race / crash first.  This seems to be a complete fix from
> that regard.
>
>> +    for (i = 0; i < thread_count; i++) {
>> +        qemu_sem_wait(&multifd_send_state->channels_created);
>> +    }
>> +
>>       for (i = 0; i < thread_count; i++) {
>>           MultiFDSendParams *p = &multifd_send_state->params[i];
>>
>> --
>> 2.35.3
>>
> One other note is I think this will also deserve a cc: stable? But then
> it'll mean all patch 3/4 will also need to copy stable to make Michael's
> life easier.
>
> Let's also copy Dan when repost; after all he more or less owns the TLS
> part.
>
> Thanks!
>
> --
> Peter Xu
>
Peter Xu Feb. 5, 2024, 12:53 p.m. UTC | #3
On Mon, Feb 05, 2024 at 01:10:14PM +0200, Avihai Horon wrote:
> 
> On 05/02/2024 8:20, Peter Xu wrote:
> > External email: Use caution opening links or attachments
> > 
> > 
> > On Fri, Feb 02, 2024 at 04:11:28PM -0300, Fabiano Rosas wrote:
> > > It is possible that one of the multifd channels fails to be created at
> > > multifd_new_send_channel_async() while the rest of the channel
> > > creation tasks are still in flight.
> > > 
> > > This could lead to multifd_save_cleanup() executing the
> > > qemu_thread_join() loop too early and not waiting for the threads
> > > which haven't been created yet, leading to the freeing of resources
> > > that the newly created threads will try to access and crash.
> > > 
> > > Add a synchronization point after which there will be no attempts at
> > > thread creation and therefore calling multifd_save_cleanup() past that
> > > point will ensure it properly waits for the threads.
> > > 
> > > A note about performance: Prior to this patch, if a channel took too
> > > long to be established, other channels could finish connecting first
> > > and already start taking load. Now we're bounded by the
> > > slowest-connecting channel.
> > Yes, I think this should (hopefully!) be fine.
> > 
> > > Signed-off-by: Fabiano Rosas <farosas@suse.de>
> > > ---
> > >   migration/multifd.c | 67 +++++++++++++++++++++++++--------------------
> > >   1 file changed, 37 insertions(+), 30 deletions(-)
> > > 
> > > diff --git a/migration/multifd.c b/migration/multifd.c
> > > index 1851206352..888ac8b05d 100644
> > > --- a/migration/multifd.c
> > > +++ b/migration/multifd.c
> > > @@ -360,6 +360,11 @@ struct {
> > >       MultiFDPages_t *pages;
> > >       /* global number of generated multifd packets */
> > >       uint64_t packet_num;
> > > +    /*
> > > +     * Synchronization point past which no more channels will be
> > > +     * created.
> > > +     */
> > > +    QemuSemaphore channels_created;
> > >       /* send channels ready */
> > >       QemuSemaphore channels_ready;
> > >       /*
> > > @@ -561,6 +566,7 @@ void multifd_save_cleanup(void)
> > >               error_free(local_err);
> > >           }
> > >       }
> > > +    qemu_sem_destroy(&multifd_send_state->channels_created);
> > >       qemu_sem_destroy(&multifd_send_state->channels_ready);
> > >       g_free(multifd_send_state->params);
> > >       multifd_send_state->params = NULL;
> > > @@ -787,13 +793,6 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
> > >       trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
> > > 
> > >       migrate_set_error(migrate_get_current(), err);
> > > -    /*
> > > -     * Error happen, mark multifd_send_thread status as 'quit' although it
> > > -     * is not created, and then tell who pay attention to me.
> > > -     */
> > > -    p->quit = true;
> > > -    qemu_sem_post(&multifd_send_state->channels_ready);
> > > -    qemu_sem_post(&p->sem_sync);
> > >       error_free(err);
> > >   }
> > > 
> > > @@ -862,39 +861,37 @@ static bool multifd_channel_connect(MultiFDSendParams *p,
> > >       return true;
> > >   }
> > > 
> > > -static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
> > > -                                             QIOChannel *ioc, Error *err)
> > > -{
> > > -     migrate_set_error(migrate_get_current(), err);
> > > -     /* Error happen, we need to tell who pay attention to me */
> > > -     qemu_sem_post(&multifd_send_state->channels_ready);
> > > -     qemu_sem_post(&p->sem_sync);
> > > -     /*
> > > -      * Although multifd_send_thread is not created, but main migration
> > > -      * thread need to judge whether it is running, so we need to mark
> > > -      * its status.
> > > -      */
> > > -     p->quit = true;
> > > -     object_unref(OBJECT(ioc));
> > > -     error_free(err);
> > > -}
> > > -
> > >   static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
> > >   {
> > >       MultiFDSendParams *p = opaque;
> > >       QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
> > >       Error *local_err = NULL;
> > > +    bool ret;
> > > 
> > >       trace_multifd_new_send_channel_async(p->id);
> > > -    if (!qio_task_propagate_error(task, &local_err)) {
> > > -        qio_channel_set_delay(ioc, false);
> > > -        if (multifd_channel_connect(p, ioc, &local_err)) {
> > > -            return;
> > > -        }
> > > +
> > > +    if (qio_task_propagate_error(task, &local_err)) {
> > > +        ret = false;
> > > +        goto out;
> > > +    }
> > > +
> > > +    qio_channel_set_delay(ioc, false);
> > > +    ret = multifd_channel_connect(p, ioc, &local_err);
> > > +
> > > +out:
> > > +    /*
> > > +     * Here we're not interested whether creation succeeded, only that
> > > +     * it happened at all.
> > > +     */
> > > +    qemu_sem_post(&multifd_send_state->channels_created);
> > > +    if (ret) {
> > > +        return;
> > >       }
> > > 
> > >       trace_multifd_new_send_channel_async_error(p->id, local_err);
> > > -    multifd_new_send_channel_cleanup(p, ioc, local_err);
> > > +    migrate_set_error(migrate_get_current(), local_err);
> > > +    object_unref(OBJECT(ioc));
> > > +    error_free(local_err);
> > >   }
> > > 
> > >   static void multifd_new_send_channel_create(gpointer opaque)
> > > @@ -918,6 +915,7 @@ bool multifd_save_setup(void)
> > >       multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
> > >       multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> > >       multifd_send_state->pages = multifd_pages_init(page_count);
> > > +    qemu_sem_init(&multifd_send_state->channels_created, 0);
> > >       qemu_sem_init(&multifd_send_state->channels_ready, 0);
> > >       qatomic_set(&multifd_send_state->exiting, 0);
> > >       multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
> > > @@ -953,6 +951,15 @@ bool multifd_save_setup(void)
> > >           multifd_new_send_channel_create(p);
> > >       }
> > > 
> > > +    /*
> > > +     * Wait until channel creation has started for all channels. The
> > > +     * creation can still fail, but no more channels will be created
> > > +     * past this point.
> > > +     */
> > Let me double check with you here on the TLS use case.
> > 
> > IIUC we still can have more channels to be created if TLS is enabled: we
> > notify the sem as long as the handshake thread is created, then the
> > handshake thread can further create the tls-armed iochannel?  However I
> > think I get your point, and that is fine, because if that is the case, even
> > though this loop can complete before tls further creates the final channel,
> > we'll still see tls_thread_created==true and join() that tls thread first,
> > then further we'll join() the next multifd thread even if a new one will
> > pop up, or if it failed then nothing to join besides the tls thread.
> > 
> > I'm not sure whether Avihai has any input, I think this can be a good idea
> > indeed.
> 
> Nothing special, my understanding of this is the same as yours.
> This fix looks solid.
> 
> >    there's a dependency chain on the ordering if my above
> > undertanding is correct; we may want to document this somewhere, perhaps
> > right here on the chaining of threads and how we handle that?
> 
> I agree, this is subtle and may deserve a small note or hint.

IMHO it'll be always better to be verbose on these than "not enough info".

One thing I'd also like a comment is now the order is a must to firstly
join tls threads then multifd threads, not vice versa, not anymore. We may
want a comment above the two join()s there to state this hard requirement.

> 
> > 
> > This may not allow a concurrent migrate_cancel to respond, but I assume
> > this is good enough; the migrate_cancel request is indeed at least so far
> > something I made up, but not a request from anyone.  We can leave that for
> > later and fix the race / crash first.  This seems to be a complete fix from
> > that regard.
> > 
> > > +    for (i = 0; i < thread_count; i++) {
> > > +        qemu_sem_wait(&multifd_send_state->channels_created);
> > > +    }
> > > +
> > >       for (i = 0; i < thread_count; i++) {
> > >           MultiFDSendParams *p = &multifd_send_state->params[i];
> > > 
> > > --
> > > 2.35.3
> > > 
> > One other note is I think this will also deserve a cc: stable? But then
> > it'll mean all patch 3/4 will also need to copy stable to make Michael's
> > life easier.
> > 
> > Let's also copy Dan when repost; after all he more or less owns the TLS
> > part.
> > 
> > Thanks!
> > 
> > --
> > Peter Xu
> > 
>
Fabiano Rosas Feb. 5, 2024, 3:41 p.m. UTC | #4
Peter Xu <peterx@redhat.com> writes:

> On Mon, Feb 05, 2024 at 01:10:14PM +0200, Avihai Horon wrote:
>> 
>> On 05/02/2024 8:20, Peter Xu wrote:
>> > External email: Use caution opening links or attachments
>> > 
>> > 
>> > On Fri, Feb 02, 2024 at 04:11:28PM -0300, Fabiano Rosas wrote:
>> > > It is possible that one of the multifd channels fails to be created at
>> > > multifd_new_send_channel_async() while the rest of the channel
>> > > creation tasks are still in flight.
>> > > 
>> > > This could lead to multifd_save_cleanup() executing the
>> > > qemu_thread_join() loop too early and not waiting for the threads
>> > > which haven't been created yet, leading to the freeing of resources
>> > > that the newly created threads will try to access and crash.
>> > > 
>> > > Add a synchronization point after which there will be no attempts at
>> > > thread creation and therefore calling multifd_save_cleanup() past that
>> > > point will ensure it properly waits for the threads.
>> > > 
>> > > A note about performance: Prior to this patch, if a channel took too
>> > > long to be established, other channels could finish connecting first
>> > > and already start taking load. Now we're bounded by the
>> > > slowest-connecting channel.
>> > Yes, I think this should (hopefully!) be fine.
>> > 
>> > > Signed-off-by: Fabiano Rosas <farosas@suse.de>
>> > > ---
>> > >   migration/multifd.c | 67 +++++++++++++++++++++++++--------------------
>> > >   1 file changed, 37 insertions(+), 30 deletions(-)
>> > > 
>> > > diff --git a/migration/multifd.c b/migration/multifd.c
>> > > index 1851206352..888ac8b05d 100644
>> > > --- a/migration/multifd.c
>> > > +++ b/migration/multifd.c
>> > > @@ -360,6 +360,11 @@ struct {
>> > >       MultiFDPages_t *pages;
>> > >       /* global number of generated multifd packets */
>> > >       uint64_t packet_num;
>> > > +    /*
>> > > +     * Synchronization point past which no more channels will be
>> > > +     * created.
>> > > +     */
>> > > +    QemuSemaphore channels_created;
>> > >       /* send channels ready */
>> > >       QemuSemaphore channels_ready;
>> > >       /*
>> > > @@ -561,6 +566,7 @@ void multifd_save_cleanup(void)
>> > >               error_free(local_err);
>> > >           }
>> > >       }
>> > > +    qemu_sem_destroy(&multifd_send_state->channels_created);
>> > >       qemu_sem_destroy(&multifd_send_state->channels_ready);
>> > >       g_free(multifd_send_state->params);
>> > >       multifd_send_state->params = NULL;
>> > > @@ -787,13 +793,6 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
>> > >       trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
>> > > 
>> > >       migrate_set_error(migrate_get_current(), err);
>> > > -    /*
>> > > -     * Error happen, mark multifd_send_thread status as 'quit' although it
>> > > -     * is not created, and then tell who pay attention to me.
>> > > -     */
>> > > -    p->quit = true;
>> > > -    qemu_sem_post(&multifd_send_state->channels_ready);
>> > > -    qemu_sem_post(&p->sem_sync);
>> > >       error_free(err);
>> > >   }
>> > > 
>> > > @@ -862,39 +861,37 @@ static bool multifd_channel_connect(MultiFDSendParams *p,
>> > >       return true;
>> > >   }
>> > > 
>> > > -static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
>> > > -                                             QIOChannel *ioc, Error *err)
>> > > -{
>> > > -     migrate_set_error(migrate_get_current(), err);
>> > > -     /* Error happen, we need to tell who pay attention to me */
>> > > -     qemu_sem_post(&multifd_send_state->channels_ready);
>> > > -     qemu_sem_post(&p->sem_sync);
>> > > -     /*
>> > > -      * Although multifd_send_thread is not created, but main migration
>> > > -      * thread need to judge whether it is running, so we need to mark
>> > > -      * its status.
>> > > -      */
>> > > -     p->quit = true;
>> > > -     object_unref(OBJECT(ioc));
>> > > -     error_free(err);
>> > > -}
>> > > -
>> > >   static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
>> > >   {
>> > >       MultiFDSendParams *p = opaque;
>> > >       QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
>> > >       Error *local_err = NULL;
>> > > +    bool ret;
>> > > 
>> > >       trace_multifd_new_send_channel_async(p->id);
>> > > -    if (!qio_task_propagate_error(task, &local_err)) {
>> > > -        qio_channel_set_delay(ioc, false);
>> > > -        if (multifd_channel_connect(p, ioc, &local_err)) {
>> > > -            return;
>> > > -        }
>> > > +
>> > > +    if (qio_task_propagate_error(task, &local_err)) {
>> > > +        ret = false;
>> > > +        goto out;
>> > > +    }
>> > > +
>> > > +    qio_channel_set_delay(ioc, false);
>> > > +    ret = multifd_channel_connect(p, ioc, &local_err);
>> > > +
>> > > +out:
>> > > +    /*
>> > > +     * Here we're not interested whether creation succeeded, only that
>> > > +     * it happened at all.
>> > > +     */
>> > > +    qemu_sem_post(&multifd_send_state->channels_created);
>> > > +    if (ret) {
>> > > +        return;
>> > >       }
>> > > 
>> > >       trace_multifd_new_send_channel_async_error(p->id, local_err);
>> > > -    multifd_new_send_channel_cleanup(p, ioc, local_err);
>> > > +    migrate_set_error(migrate_get_current(), local_err);
>> > > +    object_unref(OBJECT(ioc));
>> > > +    error_free(local_err);
>> > >   }
>> > > 
>> > >   static void multifd_new_send_channel_create(gpointer opaque)
>> > > @@ -918,6 +915,7 @@ bool multifd_save_setup(void)
>> > >       multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>> > >       multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>> > >       multifd_send_state->pages = multifd_pages_init(page_count);
>> > > +    qemu_sem_init(&multifd_send_state->channels_created, 0);
>> > >       qemu_sem_init(&multifd_send_state->channels_ready, 0);
>> > >       qatomic_set(&multifd_send_state->exiting, 0);
>> > >       multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
>> > > @@ -953,6 +951,15 @@ bool multifd_save_setup(void)
>> > >           multifd_new_send_channel_create(p);
>> > >       }
>> > > 
>> > > +    /*
>> > > +     * Wait until channel creation has started for all channels. The
>> > > +     * creation can still fail, but no more channels will be created
>> > > +     * past this point.
>> > > +     */
>> > Let me double check with you here on the TLS use case.
>> > 
>> > IIUC we still can have more channels to be created if TLS is enabled: we
>> > notify the sem as long as the handshake thread is created, then the
>> > handshake thread can further create the tls-armed iochannel?

Oh, that's just a mistake on my part. We cannot allow "no more channels
will be created" above to be a lie.

I'll fix it for v2. I just found a way to stop calling
multifd_channel_connect() twice so now I can have the TLS thread posting
the semaphore at the appropriate place.

>> > However I
>> > think I get your point, and that is fine, because if that is the case, even
>> > though this loop can complete before tls further creates the final channel,
>> > we'll still see tls_thread_created==true and join() that tls thread first,
>> > then further we'll join() the next multifd thread even if a new one will
>> > pop up, or if it failed then nothing to join besides the tls thread.
>> > 
>> > I'm not sure whether Avihai has any input, I think this can be a good idea
>> > indeed.
>> 
>> Nothing special, my understanding of this is the same as yours.
>> This fix looks solid.
>> 
>> >    there's a dependency chain on the ordering if my above
>> > undertanding is correct; we may want to document this somewhere, perhaps
>> > right here on the chaining of threads and how we handle that?
>> 
>> I agree, this is subtle and may deserve a small note or hint.
>
> IMHO it'll be always better to be verbose on these than "not enough info".
>
> One thing I'd also like a comment is now the order is a must to firstly
> join tls threads then multifd threads, not vice versa, not anymore. We may
> want a comment above the two join()s there to state this hard requirement.
>
>> 
>> > 
>> > This may not allow a concurrent migrate_cancel to respond, but I assume
>> > this is good enough; the migrate_cancel request is indeed at least so far
>> > something I made up, but not a request from anyone.  We can leave that for
>> > later and fix the race / crash first.  This seems to be a complete fix from
>> > that regard.
>> > 
>> > > +    for (i = 0; i < thread_count; i++) {
>> > > +        qemu_sem_wait(&multifd_send_state->channels_created);
>> > > +    }
>> > > +
>> > >       for (i = 0; i < thread_count; i++) {
>> > >           MultiFDSendParams *p = &multifd_send_state->params[i];
>> > > 
>> > > --
>> > > 2.35.3
>> > > 
>> > One other note is I think this will also deserve a cc: stable? But then
>> > it'll mean all patch 3/4 will also need to copy stable to make Michael's
>> > life easier.
>> > 
>> > Let's also copy Dan when repost; after all he more or less owns the TLS
>> > part.
>> > 
>> > Thanks!
>> > 
>> > --
>> > Peter Xu
>> > 
>>
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index 1851206352..888ac8b05d 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -360,6 +360,11 @@  struct {
     MultiFDPages_t *pages;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    /*
+     * Synchronization point past which no more channels will be
+     * created.
+     */
+    QemuSemaphore channels_created;
     /* send channels ready */
     QemuSemaphore channels_ready;
     /*
@@ -561,6 +566,7 @@  void multifd_save_cleanup(void)
             error_free(local_err);
         }
     }
+    qemu_sem_destroy(&multifd_send_state->channels_created);
     qemu_sem_destroy(&multifd_send_state->channels_ready);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
@@ -787,13 +793,6 @@  static void multifd_tls_outgoing_handshake(QIOTask *task,
     trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
 
     migrate_set_error(migrate_get_current(), err);
-    /*
-     * Error happen, mark multifd_send_thread status as 'quit' although it
-     * is not created, and then tell who pay attention to me.
-     */
-    p->quit = true;
-    qemu_sem_post(&multifd_send_state->channels_ready);
-    qemu_sem_post(&p->sem_sync);
     error_free(err);
 }
 
@@ -862,39 +861,37 @@  static bool multifd_channel_connect(MultiFDSendParams *p,
     return true;
 }
 
-static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
-                                             QIOChannel *ioc, Error *err)
-{
-     migrate_set_error(migrate_get_current(), err);
-     /* Error happen, we need to tell who pay attention to me */
-     qemu_sem_post(&multifd_send_state->channels_ready);
-     qemu_sem_post(&p->sem_sync);
-     /*
-      * Although multifd_send_thread is not created, but main migration
-      * thread need to judge whether it is running, so we need to mark
-      * its status.
-      */
-     p->quit = true;
-     object_unref(OBJECT(ioc));
-     error_free(err);
-}
-
 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 {
     MultiFDSendParams *p = opaque;
     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
     Error *local_err = NULL;
+    bool ret;
 
     trace_multifd_new_send_channel_async(p->id);
-    if (!qio_task_propagate_error(task, &local_err)) {
-        qio_channel_set_delay(ioc, false);
-        if (multifd_channel_connect(p, ioc, &local_err)) {
-            return;
-        }
+
+    if (qio_task_propagate_error(task, &local_err)) {
+        ret = false;
+        goto out;
+    }
+
+    qio_channel_set_delay(ioc, false);
+    ret = multifd_channel_connect(p, ioc, &local_err);
+
+out:
+    /*
+     * Here we're not interested whether creation succeeded, only that
+     * it happened at all.
+     */
+    qemu_sem_post(&multifd_send_state->channels_created);
+    if (ret) {
+        return;
     }
 
     trace_multifd_new_send_channel_async_error(p->id, local_err);
-    multifd_new_send_channel_cleanup(p, ioc, local_err);
+    migrate_set_error(migrate_get_current(), local_err);
+    object_unref(OBJECT(ioc));
+    error_free(local_err);
 }
 
 static void multifd_new_send_channel_create(gpointer opaque)
@@ -918,6 +915,7 @@  bool multifd_save_setup(void)
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     multifd_send_state->pages = multifd_pages_init(page_count);
+    qemu_sem_init(&multifd_send_state->channels_created, 0);
     qemu_sem_init(&multifd_send_state->channels_ready, 0);
     qatomic_set(&multifd_send_state->exiting, 0);
     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
@@ -953,6 +951,15 @@  bool multifd_save_setup(void)
         multifd_new_send_channel_create(p);
     }
 
+    /*
+     * Wait until channel creation has started for all channels. The
+     * creation can still fail, but no more channels will be created
+     * past this point.
+     */
+    for (i = 0; i < thread_count; i++) {
+        qemu_sem_wait(&multifd_send_state->channels_created);
+    }
+
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];