diff mbox series

[RFC,v2,1/6] migration/multifd: Remove channels_ready semaphore

Message ID 20231012140651.13122-2-farosas@suse.de
State New
Headers show
Series migration/multifd: Locking changes | expand

Commit Message

Fabiano Rosas Oct. 12, 2023, 2:06 p.m. UTC
The channels_ready semaphore is a global variable not linked to any
single multifd channel. Waiting on it only means that "some" channel
has become ready to send data. Since we need to address the channels
by index (multifd_send_state->params[i]), that information adds
nothing of value. The channel being addressed is not necessarily the
one that just released the semaphore.

The only usage of this semaphore that makes sense is to wait for it in
a loop that iterates for the number of channels. That could mean: all
channels have been setup and are operational OR all channels have
finished their work and are idle.

Currently all code that waits on channels_ready is redundant. There is
always a subsequent lock or semaphore that does the actual data
protection/synchronization.

- at multifd_send_pages: Waiting on channels_ready doesn't mean the
  'next_channel' is ready, it could be any other channel. So there are
  already cases where this code runs as if no semaphore was there.

  Waiting outside of the loop is also incorrect because if the current
  channel already has a pending_job, then it will loop into the next
  one without waiting the semaphore and the count will be greater than
  zero at the end of the execution.

  Checking that "any" channel is ready as a proxy for all channels
  being ready would work, but it's not what the code is doing and not
  really needed because the channel lock and 'sem' would be enough.

- at multifd_send_sync: This usage is correct, but it is made
  redundant by the wait on sem_sync. What this piece of code is doing
  is making sure all channels have sent the SYNC packet and became
  idle afterwards.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
 migration/multifd.c | 10 ----------
 1 file changed, 10 deletions(-)

Comments

Juan Quintela Oct. 19, 2023, 9:06 a.m. UTC | #1
Fabiano Rosas <farosas@suse.de> wrote:
> The channels_ready semaphore is a global variable not linked to any
> single multifd channel. Waiting on it only means that "some" channel
> has become ready to send data. Since we need to address the channels
> by index (multifd_send_state->params[i]), that information adds
> nothing of value.

NAK.

I disagree here O:-)

the reason why that channel exist is for multifd_send_pages()

And simplifying the function what it does is:

sem_wait(channels_ready);

for_each_channel()
   look if it is empty()

But with the semaphore, we guarantee that when we go to the loop, there
is a channel ready, so we know we donat busy wait searching for a
channel that is free.

Notice that I fully agree that the sem is not needed for locking.
Locking is done with the mutex.  It is just used to make sure that we
don't busy loop on that loop.

And we use a sem, because it is the easiest way to know how many
channels are ready (even when we only care if there is one when we
arrive to that code).

We lost count of that counter, and we fixed that here:

commit d2026ee117147893f8d80f060cede6d872ecbd7f
Author: Juan Quintela <quintela@redhat.com>
Date:   Wed Apr 26 12:20:36 2023 +0200

    multifd: Fix the number of channels ready

    We don't wait in the sem when we are doing a sync_main.  Make it

And we were addressing the problem that some users where finding that we
were busy waiting on that loop.

> The channel being addressed is not necessarily the
> one that just released the semaphore.

We only care that there is at least one free.  We are going to search
the next one.

Does this explanation makes sense?

Later, Juan.

> The only usage of this semaphore that makes sense is to wait for it in
> a loop that iterates for the number of channels. That could mean: all
> channels have been setup and are operational OR all channels have
> finished their work and are idle.
>
> Currently all code that waits on channels_ready is redundant. There is
> always a subsequent lock or semaphore that does the actual data
> protection/synchronization.
>
> - at multifd_send_pages: Waiting on channels_ready doesn't mean the
>   'next_channel' is ready, it could be any other channel. So there are
>   already cases where this code runs as if no semaphore was there.

>   Waiting outside of the loop is also incorrect because if the current
>   channel already has a pending_job, then it will loop into the next
>   one without waiting the semaphore and the count will be greater than
>   zero at the end of the execution.
>
>   Checking that "any" channel is ready as a proxy for all channels
>   being ready would work, but it's not what the code is doing and not
>   really needed because the channel lock and 'sem' would be enough.
>
> - at multifd_send_sync: This usage is correct, but it is made
>   redundant by the wait on sem_sync. What this piece of code is doing
>   is making sure all channels have sent the SYNC packet and became
>   idle afterwards.
>
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
>  migration/multifd.c | 10 ----------
>  1 file changed, 10 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 0f6b203877..e26f5f246d 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -362,8 +362,6 @@ struct {
>      MultiFDPages_t *pages;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
> -    /* send channels ready */
> -    QemuSemaphore channels_ready;
>      /*
>       * Have we already run terminate threads.  There is a race when it
>       * happens that we got one error while we are exiting.
> @@ -403,7 +401,6 @@ static int multifd_send_pages(QEMUFile *f)
>          return -1;
>      }
>  
> -    qemu_sem_wait(&multifd_send_state->channels_ready);
>      /*
>       * next_channel can remain from a previous migration that was
>       * using more channels, so ensure it doesn't overflow if the
> @@ -554,7 +551,6 @@ void multifd_save_cleanup(void)
>              error_free(local_err);
>          }
>      }
> -    qemu_sem_destroy(&multifd_send_state->channels_ready);
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
>      multifd_pages_clear(multifd_send_state->pages);
> @@ -630,7 +626,6 @@ int multifd_send_sync_main(QEMUFile *f)
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> -        qemu_sem_wait(&multifd_send_state->channels_ready);
>          trace_multifd_send_sync_main_wait(p->id);
>          qemu_sem_wait(&p->sem_sync);
>  
> @@ -664,7 +659,6 @@ static void *multifd_send_thread(void *opaque)
>      p->num_packets = 1;
>  
>      while (true) {
> -        qemu_sem_post(&multifd_send_state->channels_ready);
>          qemu_sem_wait(&p->sem);
>  
>          if (qatomic_read(&multifd_send_state->exiting)) {
> @@ -759,7 +753,6 @@ out:
>       */
>      if (ret != 0) {
>          qemu_sem_post(&p->sem_sync);
> -        qemu_sem_post(&multifd_send_state->channels_ready);
>      }
>  
>      qemu_mutex_lock(&p->mutex);
> @@ -796,7 +789,6 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
>           * is not created, and then tell who pay attention to me.
>           */
>          p->quit = true;
> -        qemu_sem_post(&multifd_send_state->channels_ready);
>          qemu_sem_post(&p->sem_sync);
>      }
>  }
> @@ -874,7 +866,6 @@ static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
>  {
>       migrate_set_error(migrate_get_current(), err);
>       /* Error happen, we need to tell who pay attention to me */
> -     qemu_sem_post(&multifd_send_state->channels_ready);
>       qemu_sem_post(&p->sem_sync);
>       /*
>        * Although multifd_send_thread is not created, but main migration
> @@ -919,7 +910,6 @@ int multifd_save_setup(Error **errp)
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>      multifd_send_state->pages = multifd_pages_init(page_count);
> -    qemu_sem_init(&multifd_send_state->channels_ready, 0);
>      qatomic_set(&multifd_send_state->exiting, 0);
>      multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
Peter Xu Oct. 19, 2023, 2:35 p.m. UTC | #2
Fabiano,

Sorry to look at this series late; I messed up my inbox after I reworked my
arrangement methodology of emails. ;)

On Thu, Oct 19, 2023 at 11:06:06AM +0200, Juan Quintela wrote:
> Fabiano Rosas <farosas@suse.de> wrote:
> > The channels_ready semaphore is a global variable not linked to any
> > single multifd channel. Waiting on it only means that "some" channel
> > has become ready to send data. Since we need to address the channels
> > by index (multifd_send_state->params[i]), that information adds
> > nothing of value.
> 
> NAK.
> 
> I disagree here O:-)
> 
> the reason why that channel exist is for multifd_send_pages()
> 
> And simplifying the function what it does is:
> 
> sem_wait(channels_ready);
> 
> for_each_channel()
>    look if it is empty()
> 
> But with the semaphore, we guarantee that when we go to the loop, there
> is a channel ready, so we know we donat busy wait searching for a
> channel that is free.
> 
> Notice that I fully agree that the sem is not needed for locking.
> Locking is done with the mutex.  It is just used to make sure that we
> don't busy loop on that loop.
> 
> And we use a sem, because it is the easiest way to know how many
> channels are ready (even when we only care if there is one when we
> arrive to that code).
> 
> We lost count of that counter, and we fixed that here:
> 
> commit d2026ee117147893f8d80f060cede6d872ecbd7f
> Author: Juan Quintela <quintela@redhat.com>
> Date:   Wed Apr 26 12:20:36 2023 +0200
> 
>     multifd: Fix the number of channels ready
> 
>     We don't wait in the sem when we are doing a sync_main.  Make it
> 
> And we were addressing the problem that some users where finding that we
> were busy waiting on that loop.

Juan,

I can understand why send_pages needs that sem, but not when sync main.
IOW, why multifd_send_sync_main() needs:

        qemu_sem_wait(&multifd_send_state->channels_ready);

If it has:

        qemu_sem_wait(&p->sem_sync);

How does a busy loop happen?

Thanks,
Fabiano Rosas Oct. 19, 2023, 2:55 p.m. UTC | #3
Juan Quintela <quintela@redhat.com> writes:

> Fabiano Rosas <farosas@suse.de> wrote:
>> The channels_ready semaphore is a global variable not linked to any
>> single multifd channel. Waiting on it only means that "some" channel
>> has become ready to send data. Since we need to address the channels
>> by index (multifd_send_state->params[i]), that information adds
>> nothing of value.
>
> NAK.
>
> I disagree here O:-)
>
> the reason why that channel exist is for multifd_send_pages()
>
> And simplifying the function what it does is:
>
> sem_wait(channels_ready);
>
> for_each_channel()
>    look if it is empty()
>
> But with the semaphore, we guarantee that when we go to the loop, there
> is a channel ready, so we know we donat busy wait searching for a
> channel that is free.
>

Ok, so that clarifies the channels_ready usage.

Now, thinking out loud... can't we simply (famous last words) remove the
"if (!p->pending_job)" line and let multifd_send_pages() prepare another
payload for the channel? That way multifd_send_pages() could already
return and the channel would see one more pending_job and proceed to
send it.

Or, since there's no resending anyway, we could dec pending_jobs earlier
before unlocking the channel. It seems the channel could be made ready
for another job as soon as the packet is built and the lock is released.

That way we could remove the semaphore and let the mutex do the job of
waiting for the channel to become ready.

> Notice that I fully agree that the sem is not needed for locking.
> Locking is done with the mutex.  It is just used to make sure that we
> don't busy loop on that loop.
>
> And we use a sem, because it is the easiest way to know how many
> channels are ready (even when we only care if there is one when we
> arrive to that code).

Yep, that's fine, no objection here.

>
> We lost count of that counter, and we fixed that here:

Kind of, because we still don't wait on it during cleanup. If we did,
then we could have an assert at the end to make sure this doesn't
regress again.

And maybe use channels_ready.count for other types of introspection.

> commit d2026ee117147893f8d80f060cede6d872ecbd7f
> Author: Juan Quintela <quintela@redhat.com>
> Date:   Wed Apr 26 12:20:36 2023 +0200
>
>     multifd: Fix the number of channels ready
>
>     We don't wait in the sem when we are doing a sync_main.  Make it
>
> And we were addressing the problem that some users where finding that we
> were busy waiting on that loop.
>
>> The channel being addressed is not necessarily the
>> one that just released the semaphore.
>
> We only care that there is at least one free.  We are going to search
> the next one.
>
> Does this explanation makes sense?

It does, thanks for taking the time to educate us =)

I made some suggestions above, but I might be missing something still.
Juan Quintela Oct. 19, 2023, 3 p.m. UTC | #4
Peter Xu <peterx@redhat.com> wrote:
> Fabiano,
>
> Sorry to look at this series late; I messed up my inbox after I reworked my
> arrangement methodology of emails. ;)
>
> On Thu, Oct 19, 2023 at 11:06:06AM +0200, Juan Quintela wrote:
>> Fabiano Rosas <farosas@suse.de> wrote:
>> > The channels_ready semaphore is a global variable not linked to any
>> > single multifd channel. Waiting on it only means that "some" channel
>> > has become ready to send data. Since we need to address the channels
>> > by index (multifd_send_state->params[i]), that information adds
>> > nothing of value.
>> 
>> NAK.
>> 
>> I disagree here O:-)
>> 
>> the reason why that channel exist is for multifd_send_pages()
>> 
>> And simplifying the function what it does is:
>> 
>> sem_wait(channels_ready);
>> 
>> for_each_channel()
>>    look if it is empty()
>> 
>> But with the semaphore, we guarantee that when we go to the loop, there
>> is a channel ready, so we know we donat busy wait searching for a
>> channel that is free.
>> 
>> Notice that I fully agree that the sem is not needed for locking.
>> Locking is done with the mutex.  It is just used to make sure that we
>> don't busy loop on that loop.
>> 
>> And we use a sem, because it is the easiest way to know how many
>> channels are ready (even when we only care if there is one when we
>> arrive to that code).
>> 
>> We lost count of that counter, and we fixed that here:
>> 
>> commit d2026ee117147893f8d80f060cede6d872ecbd7f
>> Author: Juan Quintela <quintela@redhat.com>
>> Date:   Wed Apr 26 12:20:36 2023 +0200
>> 
>>     multifd: Fix the number of channels ready
>> 
>>     We don't wait in the sem when we are doing a sync_main.  Make it
>> 
>> And we were addressing the problem that some users where finding that we
>> were busy waiting on that loop.
>
> Juan,
>
> I can understand why send_pages needs that sem, but not when sync main.
> IOW, why multifd_send_sync_main() needs:
>
>         qemu_sem_wait(&multifd_send_state->channels_ready);
>
> If it has:
>
>         qemu_sem_wait(&p->sem_sync);
>
> How does a busy loop happen?

What does multifd_send_thread() for a SYNC packet.

static void *multifd_send_thread(void *opaque)
{
    while (true) {
        qemu_sem_post(&multifd_send_state->channels_ready);
        qemu_sem_wait(&p->sem);

        qemu_mutex_lock(&p->mutex);

        if (p->pending_job) {
            ....
            qemu_mutex_unlock(&p->mutex);

            if (flags & MULTIFD_FLAG_SYNC) {
                qemu_sem_post(&p->sem_sync);
            }
    }
}

I have simplified it a lot, but yot the idea.

See the 1st post of channel_ready().
We do it for every packet sent.  Even for the SYNC ones.

Now what multifd_send_page() does?

static int multifd_send_pages(QEMUFile *f)
{
    qemu_sem_wait(&multifd_send_state->channels_ready);
    ....
}

See, we are decreasing the numbers of channels_ready because we know we
are using one.

As we are sending packets for multifd_send_sync_main(), we need to do a
hack in multifd_send_thread() and say that sync packets don't
account. Or we need to decrease that semaphore in multifd_send_sync_main()

int multifd_send_sync_main(QEMUFile *f)
{
    ....
    for (i = 0; i < migrate_multifd_channels(); i++) {
        qemu_sem_wait(&multifd_send_state->channels_ready);
        ...
    }
}

And that is what we do here.
We didn't had this last line (not needed for making sure the channels
are ready here).

But needed to make sure that we are maintaining channels_ready exact.

Later, Juan.
Juan Quintela Oct. 19, 2023, 3:18 p.m. UTC | #5
Fabiano Rosas <farosas@suse.de> wrote:
> Juan Quintela <quintela@redhat.com> writes:
>
>> Fabiano Rosas <farosas@suse.de> wrote:
>>> The channels_ready semaphore is a global variable not linked to any
>>> single multifd channel. Waiting on it only means that "some" channel
>>> has become ready to send data. Since we need to address the channels
>>> by index (multifd_send_state->params[i]), that information adds
>>> nothing of value.
>>
>> NAK.
>>
>> I disagree here O:-)
>>
>> the reason why that channel exist is for multifd_send_pages()
>>
>> And simplifying the function what it does is:
>>
>> sem_wait(channels_ready);
>>
>> for_each_channel()
>>    look if it is empty()
>>
>> But with the semaphore, we guarantee that when we go to the loop, there
>> is a channel ready, so we know we donat busy wait searching for a
>> channel that is free.
>>
>
> Ok, so that clarifies the channels_ready usage.
>
> Now, thinking out loud... can't we simply (famous last words) remove the
> "if (!p->pending_job)" line and let multifd_send_pages() prepare another
> payload for the channel? That way multifd_send_pages() could already
> return and the channel would see one more pending_job and proceed to
> send it.

No.

See the while loop in multifd_send_thread()

    while (true) {
        qemu_mutex_lock(&p->mutex);

        if (p->pending_job) {

            ......
            Do things with parts of the struct that are shared with the
            migration thread
            ....
            qemu_mutex_unlock(&p->mutex);

            // Drop the lock
            // Do mothing things on the channel, pending_job means that
            // it is working
            // mutex dropped means that migration_thread can use the
            // shared variables, but not the channel

            // now here we decrease pending_job, so main thread can
            // change things as it wants
            // But we need to take the lock first.
            qemu_mutex_lock(&p->mutex);
            p->pending_job--;
            qemu_mutex_unlock(&p->mutex);
            ......
        }
    }

This is a common pattern for concurrency.  To not have your mutex locked
too long, you put a variable (that can only be tested/changed with the
lock) to explain that the "channel" is busy, the struct that lock
protects is not (see how we make sure that the channel don't use any
variable of the struct without the locking).


> Or, since there's no resending anyway, we could dec pending_jobs earlier
> before unlocking the channel. It seems the channel could be made ready
> for another job as soon as the packet is built and the lock is released.

pending_jobs can be transformed in a bool.  We just need to make sure
that we didn't screw it in _sync().

> That way we could remove the semaphore and let the mutex do the job of
> waiting for the channel to become ready.

As said, we don't want that.  Because channels can go a different speeds
due to factors outside of our control.

If the semaphore bothers you, you can change it to to a condition
variable, but you just move the complexity from one side to the other
(Initial implementation had a condition variable, but Paolo said that
the semaphore is more efficient, so he won)

>> Notice that I fully agree that the sem is not needed for locking.
>> Locking is done with the mutex.  It is just used to make sure that we
>> don't busy loop on that loop.
>>
>> And we use a sem, because it is the easiest way to know how many
>> channels are ready (even when we only care if there is one when we
>> arrive to that code).
>
> Yep, that's fine, no objection here.
>
>>
>> We lost count of that counter, and we fixed that here:
>
> Kind of, because we still don't wait on it during cleanup. If we did,
> then we could have an assert at the end to make sure this doesn't
> regress again.
>
> And maybe use channels_ready.count for other types of introspection.

We could.

>> commit d2026ee117147893f8d80f060cede6d872ecbd7f
>> Author: Juan Quintela <quintela@redhat.com>
>> Date:   Wed Apr 26 12:20:36 2023 +0200
>>
>>     multifd: Fix the number of channels ready
>>
>>     We don't wait in the sem when we are doing a sync_main.  Make it
>>
>> And we were addressing the problem that some users where finding that we
>> were busy waiting on that loop.
>>
>>> The channel being addressed is not necessarily the
>>> one that just released the semaphore.
>>
>> We only care that there is at least one free.  We are going to search
>> the next one.
>>
>> Does this explanation makes sense?
>
> It does, thanks for taking the time to educate us =)
>
> I made some suggestions above, but I might be missing something still.

I think that the current code is already quite efficient.

But will have to think if that improves anything major.

Later, Juan.
Peter Xu Oct. 19, 2023, 3:46 p.m. UTC | #6
On Thu, Oct 19, 2023 at 05:00:02PM +0200, Juan Quintela wrote:
> Peter Xu <peterx@redhat.com> wrote:
> > Fabiano,
> >
> > Sorry to look at this series late; I messed up my inbox after I reworked my
> > arrangement methodology of emails. ;)
> >
> > On Thu, Oct 19, 2023 at 11:06:06AM +0200, Juan Quintela wrote:
> >> Fabiano Rosas <farosas@suse.de> wrote:
> >> > The channels_ready semaphore is a global variable not linked to any
> >> > single multifd channel. Waiting on it only means that "some" channel
> >> > has become ready to send data. Since we need to address the channels
> >> > by index (multifd_send_state->params[i]), that information adds
> >> > nothing of value.
> >> 
> >> NAK.
> >> 
> >> I disagree here O:-)
> >> 
> >> the reason why that channel exist is for multifd_send_pages()
> >> 
> >> And simplifying the function what it does is:
> >> 
> >> sem_wait(channels_ready);
> >> 
> >> for_each_channel()
> >>    look if it is empty()
> >> 
> >> But with the semaphore, we guarantee that when we go to the loop, there
> >> is a channel ready, so we know we donat busy wait searching for a
> >> channel that is free.
> >> 
> >> Notice that I fully agree that the sem is not needed for locking.
> >> Locking is done with the mutex.  It is just used to make sure that we
> >> don't busy loop on that loop.
> >> 
> >> And we use a sem, because it is the easiest way to know how many
> >> channels are ready (even when we only care if there is one when we
> >> arrive to that code).
> >> 
> >> We lost count of that counter, and we fixed that here:
> >> 
> >> commit d2026ee117147893f8d80f060cede6d872ecbd7f
> >> Author: Juan Quintela <quintela@redhat.com>
> >> Date:   Wed Apr 26 12:20:36 2023 +0200
> >> 
> >>     multifd: Fix the number of channels ready
> >> 
> >>     We don't wait in the sem when we are doing a sync_main.  Make it
> >> 
> >> And we were addressing the problem that some users where finding that we
> >> were busy waiting on that loop.
> >
> > Juan,
> >
> > I can understand why send_pages needs that sem, but not when sync main.
> > IOW, why multifd_send_sync_main() needs:
> >
> >         qemu_sem_wait(&multifd_send_state->channels_ready);
> >
> > If it has:
> >
> >         qemu_sem_wait(&p->sem_sync);
> >
> > How does a busy loop happen?
> 
> What does multifd_send_thread() for a SYNC packet.
> 
> static void *multifd_send_thread(void *opaque)
> {
>     while (true) {
>         qemu_sem_post(&multifd_send_state->channels_ready);
>         qemu_sem_wait(&p->sem);
> 
>         qemu_mutex_lock(&p->mutex);
> 
>         if (p->pending_job) {
>             ....
>             qemu_mutex_unlock(&p->mutex);
> 
>             if (flags & MULTIFD_FLAG_SYNC) {
>                 qemu_sem_post(&p->sem_sync);
>             }
>     }
> }
> 
> I have simplified it a lot, but yot the idea.
> 
> See the 1st post of channel_ready().
> We do it for every packet sent.  Even for the SYNC ones.
> 
> Now what multifd_send_page() does?
> 
> static int multifd_send_pages(QEMUFile *f)
> {
>     qemu_sem_wait(&multifd_send_state->channels_ready);
>     ....
> }
> 
> See, we are decreasing the numbers of channels_ready because we know we
> are using one.
> 
> As we are sending packets for multifd_send_sync_main(), we need to do a
> hack in multifd_send_thread() and say that sync packets don't
> account. Or we need to decrease that semaphore in multifd_send_sync_main()
> 
> int multifd_send_sync_main(QEMUFile *f)
> {
>     ....
>     for (i = 0; i < migrate_multifd_channels(); i++) {
>         qemu_sem_wait(&multifd_send_state->channels_ready);
>         ...
>     }
> }
> 
> And that is what we do here.
> We didn't had this last line (not needed for making sure the channels
> are ready here).
> 
> But needed to make sure that we are maintaining channels_ready exact.

I didn't expect it to be exact, I think that's the major part of confusion.
For example, I see this comment:

static void *multifd_send_thread(void *opaque)
       ...
        } else {
            qemu_mutex_unlock(&p->mutex);
            /* sometimes there are spurious wakeups */
        }

So do we have spurious wakeup anywhere for either p->sem or channels_ready?
They are related, because if we got spurious p->sem wakeups, then we'll
boost channels_ready one more time too there.

I think two ways to go here:

  - If we want to make them all exact: we'd figure out where are spurious
    wake ups and we fix all of them.  Or,

  - IMHO we can also make them not exact.  It means they can allow
    spurious, and code can actually also work like that.  One example is
    e.g. what happens if we get spurious wakeup in multifd_send_pages() for
    channels_ready?  We simply do some cpu loops as long as we double check
    with each channel again, we can even do better that if looping over N
    channels and see all busy, "goto retry" and wait on the sem again.

What do you think?

Thanks,
Fabiano Rosas Oct. 19, 2023, 3:56 p.m. UTC | #7
Juan Quintela <quintela@redhat.com> writes:

> Fabiano Rosas <farosas@suse.de> wrote:
>> Juan Quintela <quintela@redhat.com> writes:
>>
>>> Fabiano Rosas <farosas@suse.de> wrote:
>>>> The channels_ready semaphore is a global variable not linked to any
>>>> single multifd channel. Waiting on it only means that "some" channel
>>>> has become ready to send data. Since we need to address the channels
>>>> by index (multifd_send_state->params[i]), that information adds
>>>> nothing of value.
>>>
>>> NAK.
>>>
>>> I disagree here O:-)
>>>
>>> the reason why that channel exist is for multifd_send_pages()
>>>
>>> And simplifying the function what it does is:
>>>
>>> sem_wait(channels_ready);
>>>
>>> for_each_channel()
>>>    look if it is empty()
>>>
>>> But with the semaphore, we guarantee that when we go to the loop, there
>>> is a channel ready, so we know we donat busy wait searching for a
>>> channel that is free.
>>>
>>
>> Ok, so that clarifies the channels_ready usage.
>>
>> Now, thinking out loud... can't we simply (famous last words) remove the
>> "if (!p->pending_job)" line and let multifd_send_pages() prepare another
>> payload for the channel? That way multifd_send_pages() could already
>> return and the channel would see one more pending_job and proceed to
>> send it.
>
> No.
>
> See the while loop in multifd_send_thread()
>
>     while (true) {
>         qemu_mutex_lock(&p->mutex);
>
>         if (p->pending_job) {
>
>             ......
>             Do things with parts of the struct that are shared with the
>             migration thread
>             ....
>             qemu_mutex_unlock(&p->mutex);
>
>             // Drop the lock
>             // Do mothing things on the channel, pending_job means that
>             // it is working
>             // mutex dropped means that migration_thread can use the
>             // shared variables, but not the channel
>
>             // now here we decrease pending_job, so main thread can
>             // change things as it wants
>             // But we need to take the lock first.
>             qemu_mutex_lock(&p->mutex);
>             p->pending_job--;
>             qemu_mutex_unlock(&p->mutex);
>             ......
>         }
>     }
>
> This is a common pattern for concurrency.  To not have your mutex locked
> too long, you put a variable (that can only be tested/changed with the
> lock) to explain that the "channel" is busy, the struct that lock
> protects is not (see how we make sure that the channel don't use any
> variable of the struct without the locking).

Sure, but what purpose is to mark the channel as busy? The migration
thread cannot access the p->packet anyway. From multifd_send_pages()
perspective, as soon as the channel releases the lock to start with the
IO, the packet has been sent. It could start preparing the next pages
struct while the channel is doing IO. No?

We don't touch p after the IO aside from p->pending_jobs-- and we
already distribute the load uniformly by incrementing next_channel.

I'm not saying this would be more performant, just wondering if it would
be possible.

>
>
>> Or, since there's no resending anyway, we could dec pending_jobs earlier
>> before unlocking the channel. It seems the channel could be made ready
>> for another job as soon as the packet is built and the lock is released.
>
> pending_jobs can be transformed in a bool.  We just need to make sure
> that we didn't screw it in _sync().
>
>> That way we could remove the semaphore and let the mutex do the job of
>> waiting for the channel to become ready.
>
> As said, we don't want that.  Because channels can go a different speeds
> due to factors outside of our control.
>
> If the semaphore bothers you, you can change it to to a condition
> variable, but you just move the complexity from one side to the other
> (Initial implementation had a condition variable, but Paolo said that
> the semaphore is more efficient, so he won)

Oh, it doesn't bother me. I'm just trying to unequivocally understand
it's effects. And if it logically follows that it's not necessary, only
then remove it.
Juan Quintela Oct. 19, 2023, 6:28 p.m. UTC | #8
Peter Xu <peterx@redhat.com> wrote:
> On Thu, Oct 19, 2023 at 05:00:02PM +0200, Juan Quintela wrote:
>> Peter Xu <peterx@redhat.com> wrote:
>> > Fabiano,
>> >
>> > Sorry to look at this series late; I messed up my inbox after I reworked my
>> > arrangement methodology of emails. ;)
>> >
>> > On Thu, Oct 19, 2023 at 11:06:06AM +0200, Juan Quintela wrote:
>> >> Fabiano Rosas <farosas@suse.de> wrote:
>> >> > The channels_ready semaphore is a global variable not linked to any
>> >> > single multifd channel. Waiting on it only means that "some" channel
>> >> > has become ready to send data. Since we need to address the channels
>> >> > by index (multifd_send_state->params[i]), that information adds
>> >> > nothing of value.

>> And that is what we do here.
>> We didn't had this last line (not needed for making sure the channels
>> are ready here).
>> 
>> But needed to make sure that we are maintaining channels_ready exact.
>
> I didn't expect it to be exact, I think that's the major part of confusion.
> For example, I see this comment:
>
> static void *multifd_send_thread(void *opaque)
>        ...
>         } else {
>             qemu_mutex_unlock(&p->mutex);
>             /* sometimes there are spurious wakeups */
>         }

I put that there during development, and let it there just to be safe.
Years later I put an assert() there and did lots of migrations, never
hit it.

> So do we have spurious wakeup anywhere for either p->sem or channels_ready?
> They are related, because if we got spurious p->sem wakeups, then we'll
> boost channels_ready one more time too there.

I think that we can change that for g_assert_not_reached()

> I think two ways to go here:
>
>   - If we want to make them all exact: we'd figure out where are spurious
>     wake ups and we fix all of them.  Or,

This one.

>   - IMHO we can also make them not exact.  It means they can allow
>     spurious, and code can actually also work like that.  One example is
>     e.g. what happens if we get spurious wakeup in multifd_send_pages() for
>     channels_ready?  We simply do some cpu loops as long as we double check
>     with each channel again, we can even do better that if looping over N
>     channels and see all busy, "goto retry" and wait on the sem again.
>
> What do you think?

Make sure that it is exact O:-)

Later, Juan.
Juan Quintela Oct. 19, 2023, 6:41 p.m. UTC | #9
Fabiano Rosas <farosas@suse.de> wrote:
> Juan Quintela <quintela@redhat.com> writes:
>
>> Fabiano Rosas <farosas@suse.de> wrote:
>>> Juan Quintela <quintela@redhat.com> writes:
>>>
>>
>> This is a common pattern for concurrency.  To not have your mutex locked
>> too long, you put a variable (that can only be tested/changed with the
>> lock) to explain that the "channel" is busy, the struct that lock
>> protects is not (see how we make sure that the channel don't use any
>> variable of the struct without the locking).
>
> Sure, but what purpose is to mark the channel as busy? The migration
> thread cannot access the p->packet anyway. From multifd_send_pages()
> perspective, as soon as the channel releases the lock to start with the
> IO, the packet has been sent. It could start preparing the next pages
> struct while the channel is doing IO. No?

ok, we remove the pending.
Then we are sending that packet.

But see what happens on multifd_send_pages()

channels_ready is 0.
this is channel 1
next_channel == 1
channel 0 gets ready, so it increases channels_ready.

static int multifd_send_pages(QEMUFile *f)
{

    qemu_sem_wait(&multifd_send_state->channels_ready);
    // we pass this

    next_channel %= migrate_multifd_channels();
    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
        p = &multifd_send_state->params[i];

        // remember that i == 0

        qemu_mutex_lock(&p->mutex);
        if (p->quit) {
            error_report("%s: channel %d has already quit!", __func__, i);
            qemu_mutex_unlock(&p->mutex);
            return -1;
        }
        if (!p->pending_job) {
            p->pending_job++;
            next_channel = (i + 1) % migrate_multifd_channels();
            break;
        }
        qemu_mutex_unlock(&p->mutex);

// We choose 1, to send the packet through it.
// channel 1 is busy.
// channel 0 is idle but receives no work.
    }
...
}

So the variable is there to differentiate what channels are busy/idle to
send the work to the idle channels.

> We don't touch p after the IO aside from p->pending_jobs-- and we
> already distribute the load uniformly by incrementing next_channel.

I know.  After multifd_send_threads() releases the mutex it will only
touch ->pending_job (taking the mutex 1st).

> I'm not saying this would be more performant, just wondering if it would
> be possible.

Yeap, but as said before quite suboptimal.

>> As said, we don't want that.  Because channels can go a different speeds
>> due to factors outside of our control.
>>
>> If the semaphore bothers you, you can change it to to a condition
>> variable, but you just move the complexity from one side to the other
>> (Initial implementation had a condition variable, but Paolo said that
>> the semaphore is more efficient, so he won)
>
> Oh, it doesn't bother me. I'm just trying to unequivocally understand
> it's effects. And if it logically follows that it's not necessary, only
> then remove it.

Both channels_ready and pending_job makes the scheme more performant.
Without them it will not fail, just work way slower.

In the example that just showed you, if we started always from channel 0
to search for a idle channel, we would even do worse (that would be an
actual error):

start with channels_ready == 0;
channels_ready is 0.
channel 1 gets ready, so it increases channels_ready.

static int multifd_send_pages(QEMUFile *f)
{
    qemu_sem_wait(&multifd_send_state->channels_ready);
    // we pass this

    for (i = 0;; i = (i + 1) % migrate_multifd_channels()) {
        p = &multifd_send_state->params[i];

        // remember that i == 0

        qemu_mutex_lock(&p->mutex);
        if (p->quit) {
            error_report("%s: channel %d has already quit!", __func__, i);
            qemu_mutex_unlock(&p->mutex);
            return -1;
        }
        if (!p->pending_job) {
            p->pending_job++;
            next_channel = (i + 1) % migrate_multifd_channels();
            break;
        }
        // As there is no test to see if this is idle, we put the page here
        qemu_mutex_unlock(&p->mutex);

        // We put here the page info
    }
...
}

channel 2 guest ready, so it increses channels_ready

static int multifd_send_pages(QEMUFile *f)
{
    qemu_sem_wait(&multifd_send_state->channels_ready);
    // we pass this

    for (i = 0;; i = (i + 1) % migrate_multifd_channels()) {
        p = &multifd_send_state->params[i];

        // remember that i == 0

        qemu_mutex_lock(&p->mutex);
        if (p->quit) {
            error_report("%s: channel %d has already quit!", __func__, i);
            qemu_mutex_unlock(&p->mutex);
            return -1;
        }
        if (!p->pending_job) {
            p->pending_job++;
            next_channel = (i + 1) % migrate_multifd_channels();
            break;
        }
        // As there is no test to see if this is idle, we put the page here
        qemu_mutex_unlock(&p->mutex);

        // We put here the page info
        // channel 0 is still transmitting the 1st page
        // And overwrote the previous page info
   }
...
}

In this particular case, using next_channel in round robin would have
saved the case.  When you put info for a channel to consume
asynhronously, you need to mark somehow that the channel has finished to
use the data before ordering it to put do more job.

We can changing pending_job to a bool if you preffer.  I think that we
have nailed all the off_by_one errors by now (famous last words).

Later, Juan.
Peter Xu Oct. 19, 2023, 6:50 p.m. UTC | #10
On Thu, Oct 19, 2023 at 08:28:05PM +0200, Juan Quintela wrote:
> Peter Xu <peterx@redhat.com> wrote:
> > On Thu, Oct 19, 2023 at 05:00:02PM +0200, Juan Quintela wrote:
> >> Peter Xu <peterx@redhat.com> wrote:
> >> > Fabiano,
> >> >
> >> > Sorry to look at this series late; I messed up my inbox after I reworked my
> >> > arrangement methodology of emails. ;)
> >> >
> >> > On Thu, Oct 19, 2023 at 11:06:06AM +0200, Juan Quintela wrote:
> >> >> Fabiano Rosas <farosas@suse.de> wrote:
> >> >> > The channels_ready semaphore is a global variable not linked to any
> >> >> > single multifd channel. Waiting on it only means that "some" channel
> >> >> > has become ready to send data. Since we need to address the channels
> >> >> > by index (multifd_send_state->params[i]), that information adds
> >> >> > nothing of value.
> 
> >> And that is what we do here.
> >> We didn't had this last line (not needed for making sure the channels
> >> are ready here).
> >> 
> >> But needed to make sure that we are maintaining channels_ready exact.
> >
> > I didn't expect it to be exact, I think that's the major part of confusion.
> > For example, I see this comment:
> >
> > static void *multifd_send_thread(void *opaque)
> >        ...
> >         } else {
> >             qemu_mutex_unlock(&p->mutex);
> >             /* sometimes there are spurious wakeups */
> >         }
> 
> I put that there during development, and let it there just to be safe.
> Years later I put an assert() there and did lots of migrations, never
> hit it.
> 
> > So do we have spurious wakeup anywhere for either p->sem or channels_ready?
> > They are related, because if we got spurious p->sem wakeups, then we'll
> > boost channels_ready one more time too there.
> 
> I think that we can change that for g_assert_not_reached()

Sounds good.  We can also use an error_erport_once(), depending on your
confidence of that. :)  Dropping that comment definitely helps.

I had a quick look, indeed I think it's safe even with assert.  We may want
to put some more comment on when one should kick p->sem; IIUC it can only
be kicked in either (1) pending_job increased, or (2) set exiting=1.  Then
it seems all guaranteed.

Thanks,
Peter Xu Oct. 19, 2023, 7:04 p.m. UTC | #11
On Thu, Oct 19, 2023 at 08:41:26PM +0200, Juan Quintela wrote:
> We can changing pending_job to a bool if you preffer.  I think that we
> have nailed all the off_by_one errors by now (famous last words).

Would it work to make pending_job a bool, even with SYNC?  It seems to me
multifd_send_sync_main() now can boost pending_job even if pending_job==1.

That's also the place where I really think confusing too; where it looks
like the migration thread can modify a pending job's flag as long as it is
fast enough before the send thread put that onto the wire.  Then it's
unpredictable whether the SYNC flag will be sent with current packet (where
due to pending_jobs==1 already, can contain valid pages), or will be only
set for the next one (where there will have 0 real page).

IMHO it'll be good to separate the sync task, then we can change
pending_jobs to bool.  Something like:

  bool pending_send_page;
  bool pending_send_sync;

Then multifd_send_thread() handles them separately, only attaching
p->flags=SYNC when pending_send_sync is requested.  It guarantees a SYNC
message will always be a separate packet, which will be crystal clear then.
Juan Quintela Oct. 20, 2023, 7:53 a.m. UTC | #12
Peter Xu <peterx@redhat.com> wrote:
> On Thu, Oct 19, 2023 at 08:41:26PM +0200, Juan Quintela wrote:
>> We can changing pending_job to a bool if you preffer.  I think that we
>> have nailed all the off_by_one errors by now (famous last words).
>
> Would it work to make pending_job a bool, even with SYNC?  It seems to me
> multifd_send_sync_main() now can boost pending_job even if pending_job==1.

Then a int is ok, I think.

> That's also the place where I really think confusing too; where it looks
> like the migration thread can modify a pending job's flag as long as it is
> fast enough before the send thread put that onto the wire.

It never does.

    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
        qemu_mutex_lock(&p->mutex);
        ...
        if (!p->pending_job) {
            p->pending_job++;
            next_channel = (i + 1) % migrate_multifd_channels();
            break;
        }
        qemu_mutex_unlock(&p->mutex);
    }

If pending_job == 0 -> owner of the channel is migration_thread and it
can use it.

If pending_job > 0 -> owner of the channel is the channel thread and
migration_thread can't use it.

I think that this is easy to understand.  You are right that it is not
_explained_.  And clearly, if you have to ask, it is not obvious O:-)

Yes, it was obvious to me, that is the reason why I wrote it on the 1st
place.  Notice also that it is a common idiom in multithreaded apps.
That allows it to do stuff without having to have a mutex locked, so
other threads can "look" into the state.

> Then it's
> unpredictable whether the SYNC flag will be sent with current packet (where
> due to pending_jobs==1 already, can contain valid pages), or will be only
> set for the next one (where there will have 0 real page).

I have to think about this one.
Decrease pending_jobs there if we are doing multiple jobs?
But we still have the issue of the semaphore.

> IMHO it'll be good to separate the sync task, then we can change
> pending_jobs to bool.  Something like:
>
>   bool pending_send_page;
>   bool pending_send_sync;

current code:

        qemu_mutex_lock(&p->mutex);
        qemu_mutex_lock(&p->mutex);

        if (p->pending_job) {
            uint64_t packet_num = p->packet_num;
            uint32_t flags;
            p->normal_num = 0;

            if (use_zero_copy_send) {
                p->iovs_num = 0;
            } else {
                p->iovs_num = 1;
            }

            for (int i = 0; i < p->pages->num; i++) {
                p->normal[p->normal_num] = p->pages->offset[i];
                p->normal_num++;
            }

            if (p->normal_num) {
                ret = multifd_send_state->ops->send_prepare(p, &local_err);
                if (ret != 0) {
                    qemu_mutex_unlock(&p->mutex);
                    break;
                }
            }
            multifd_send_fill_packet(p);
            flags = p->flags;
            p->flags = 0;
            p->num_packets++;
            p->total_normal_pages += p->normal_num;
            p->pages->num = 0;
            p->pages->block = NULL;
            qemu_mutex_unlock(&p->mutex);

            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
                               p->next_packet_size);

            if (use_zero_copy_send) {
                /* Send header first, without zerocopy */
                ret = qio_channel_write_all(p->c, (void *)p->packet,
                                            p->packet_len, &local_err);
                if (ret != 0) {
                    break;
                }
            } else {
                /* Send header using the same writev call */
                p->iov[0].iov_len = p->packet_len;
                p->iov[0].iov_base = p->packet;
            }

            ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
                                              0, p->write_flags, &local_err);
            if (ret != 0) {
                break;
            }

            stat64_add(&mig_stats.multifd_bytes,
                       p->next_packet_size + p->packet_len);
            stat64_add(&mig_stats.transferred,
                       p->next_packet_size + p->packet_len);
            p->next_packet_size = 0;
            qemu_mutex_lock(&p->mutex);
            p->pending_job--;
            qemu_mutex_unlock(&p->mutex);

            if (flags & MULTIFD_FLAG_SYNC) {
                qemu_sem_post(&p->sem_sync);
            }
        } else {
            qemu_mutex_unlock(&p->mutex);
            /* sometimes there are spurious wakeups */
        }

Your suggested change:

       qemu_mutex_lock(&p->mutex);

       if (p->pending_job_page) {
            uint64_t packet_num = p->packet_num;
            uint32_t flags;
            p->normal_num = 0;

            if (use_zero_copy_send) {
                p->iovs_num = 0;
            } else {
                p->iovs_num = 1;
            }

            for (int i = 0; i < p->pages->num; i++) {
                p->normal[p->normal_num] = p->pages->offset[i];
                p->normal_num++;
            }

            if (p->normal_num) {
                ret = multifd_send_state->ops->send_prepare(p, &local_err);
                if (ret != 0) {
                    qemu_mutex_unlock(&p->mutex);
                    break;
                }
            }
            multifd_send_fill_packet(p);
            flags = p->flags;
            p->flags = 0;
            p->num_packets++;
            p->total_normal_pages += p->normal_num;
            p->pages->num = 0;
            p->pages->block = NULL;
            qemu_mutex_unlock(&p->mutex);

            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
                               p->next_packet_size);

            if (use_zero_copy_send) {
                /* Send header first, without zerocopy */
                ret = qio_channel_write_all(p->c, (void *)p->packet,
                                            p->packet_len, &local_err);
                if (ret != 0) {
                    break;
                }
            } else {
                /* Send header using the same writev call */
                p->iov[0].iov_len = p->packet_len;
                p->iov[0].iov_base = p->packet;
            }

            ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
                                              0, p->write_flags, &local_err);
            if (ret != 0) {
                break;
            }

            stat64_add(&mig_stats.multifd_bytes,
                       p->next_packet_size + p->packet_len);
            stat64_add(&mig_stats.transferred,
                       p->next_packet_size + p->packet_len);
            p->next_packet_size = 0;
            qemu_mutex_lock(&p->mutex);
            p->pending_job_page = false;
            qemu_mutex_unlock(&p->mutex);

       else if (p->pending_job_sync)
            uint64_t packet_num = p->packet_num;
            uint32_t flags;
            p->normal_num = 0;

            if (use_zero_copy_send) {
                p->iovs_num = 0;
            } else {
                p->iovs_num = 1;
            }

            multifd_send_fill_packet(p);
            flags = p->flags;
            p->flags = 0;
            p->num_packets++;
            p->total_normal_pages += p->normal_num;
            p->pages->num = 0;
            p->pages->block = NULL;
            qemu_mutex_unlock(&p->mutex);

            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
                               p->next_packet_size);

            if (use_zero_copy_send) {
                /* Send header first, without zerocopy */
                ret = qio_channel_write_all(p->c, (void *)p->packet,
                                            p->packet_len, &local_err);
                if (ret != 0) {
                    break;
                }
            } else {
                /* Send header using the same writev call */
                p->iov[0].iov_len = p->packet_len;
                p->iov[0].iov_base = p->packet;
            }

            ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
                                              0, p->write_flags, &local_err);
            if (ret != 0) {
                break;
            }

            stat64_add(&mig_stats.multifd_bytes,
                       p->next_packet_size + p->packet_len);
            stat64_add(&mig_stats.transferred,
                       p->next_packet_size + p->packet_len);
            p->next_packet_size = 0;
            qemu_mutex_lock(&p->mutex);
            p->pending_job_sync = false;
            qemu_mutex_unlock(&p->mutex);

            if (flags & MULTIFD_FLAG_SYNC) {
                qemu_sem_post(&p->sem_sync);
            }
        } else {
            qemu_mutex_unlock(&p->mutex);
            /* sometimes there are spurious wakeups */
        }

I.e. we duplicate much more code than the one that we remove.  I am not
convinced.


> Then multifd_send_thread() handles them separately, only attaching
> p->flags=SYNC when pending_send_sync is requested.  It guarantees a SYNC
> message will always be a separate packet, which will be crystal clear then.

This is not a requirement.
Code should handle the reception of SYNC with a page.  We just don't
sent them because it is more complex.
Juan Quintela Oct. 20, 2023, 7:56 a.m. UTC | #13
Peter Xu <peterx@redhat.com> wrote:
> On Thu, Oct 19, 2023 at 08:28:05PM +0200, Juan Quintela wrote:
>> Peter Xu <peterx@redhat.com> wrote:
>> > On Thu, Oct 19, 2023 at 05:00:02PM +0200, Juan Quintela wrote:
>> >> Peter Xu <peterx@redhat.com> wrote:
>> >> > Fabiano,
>> >> >
>> >> > Sorry to look at this series late; I messed up my inbox after I reworked my
>> >> > arrangement methodology of emails. ;)
>> >> >
>> >> > On Thu, Oct 19, 2023 at 11:06:06AM +0200, Juan Quintela wrote:
>> >> >> Fabiano Rosas <farosas@suse.de> wrote:
>> >> >> > The channels_ready semaphore is a global variable not linked to any
>> >> >> > single multifd channel. Waiting on it only means that "some" channel
>> >> >> > has become ready to send data. Since we need to address the channels
>> >> >> > by index (multifd_send_state->params[i]), that information adds
>> >> >> > nothing of value.
>> 
>> >> And that is what we do here.
>> >> We didn't had this last line (not needed for making sure the channels
>> >> are ready here).
>> >> 
>> >> But needed to make sure that we are maintaining channels_ready exact.
>> >
>> > I didn't expect it to be exact, I think that's the major part of confusion.
>> > For example, I see this comment:
>> >
>> > static void *multifd_send_thread(void *opaque)
>> >        ...
>> >         } else {
>> >             qemu_mutex_unlock(&p->mutex);
>> >             /* sometimes there are spurious wakeups */
>> >         }
>> 
>> I put that there during development, and let it there just to be safe.
>> Years later I put an assert() there and did lots of migrations, never
>> hit it.
>> 
>> > So do we have spurious wakeup anywhere for either p->sem or channels_ready?
>> > They are related, because if we got spurious p->sem wakeups, then we'll
>> > boost channels_ready one more time too there.
>> 
>> I think that we can change that for g_assert_not_reached()
>
> Sounds good.  We can also use an error_erport_once(), depending on your
> confidence of that. :)  Dropping that comment definitely helps.
>
> I had a quick look, indeed I think it's safe even with assert.  We may want
> to put some more comment on when one should kick p->sem; IIUC it can only
> be kicked in either (1) pending_job increased, or (2) set exiting=1.  Then
> it seems all guaranteed.

I think we can change the end of the loop from:

            qemu_mutex_unlock(&p->mutex);

            if (flags & MULTIFD_FLAG_SYNC) {
                qemu_sem_post(&p->sem_sync);
            }
        } else {
            qemu_mutex_unlock(&p->mutex);
            /* sometimes there are spurious wakeups */
        }

to:

            if (flags & MULTIFD_FLAG_SYNC) {
                qemu_sem_post(&p->sem_sync);
            }
        }
        qemu_mutex_unlock(&p->mutex);


And call it a day.  But we can leave one assert there.

But I would preffer to do this kind of locking changes at the beggining
of next cycle.

Later, Juan.
Fabiano Rosas Oct. 20, 2023, 12:48 p.m. UTC | #14
Juan Quintela <quintela@redhat.com> writes:

> Peter Xu <peterx@redhat.com> wrote:
>> On Thu, Oct 19, 2023 at 08:41:26PM +0200, Juan Quintela wrote:
>>> We can changing pending_job to a bool if you preffer.  I think that we
>>> have nailed all the off_by_one errors by now (famous last words).
>>
>> Would it work to make pending_job a bool, even with SYNC?  It seems to me
>> multifd_send_sync_main() now can boost pending_job even if pending_job==1.
>
> Then a int is ok, I think.
>
>> That's also the place where I really think confusing too; where it looks
>> like the migration thread can modify a pending job's flag as long as it is
>> fast enough before the send thread put that onto the wire.
>
> It never does.
>
>     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
>         qemu_mutex_lock(&p->mutex);
>         ...
>         if (!p->pending_job) {
>             p->pending_job++;
>             next_channel = (i + 1) % migrate_multifd_channels();
>             break;
>         }
>         qemu_mutex_unlock(&p->mutex);
>     }

We copy the flags into the packet header at multifd_send_fill_packet()
before unlocking. I think that is even independent of the pending_jobs
scheme.

> If pending_job == 0 -> owner of the channel is migration_thread and it
> can use it.
>
> If pending_job > 0 -> owner of the channel is the channel thread and
> migration_thread can't use it.

Do you really mean "migration_thread" here or just multifd_send_pages()?
Because multifd_send_sync_main() doesn't care about this ownership
rule. Does that mean that code is incorrect?

> I think that this is easy to understand.  You are right that it is not
> _explained_.  And clearly, if you have to ask, it is not obvious O:-)

It is explained at multifd.h. But it doesn't really matter because code
can drift and documentation doesn't assure correctness. That's why we
have to ask about seemingly obvious stuff.

> Yes, it was obvious to me, that is the reason why I wrote it on the
> 1st place.  Notice also that it is a common idiom in multithreaded
> apps.  That allows it to do stuff without having to have a mutex
> locked, so other threads can "look" into the state.
>> Then it's
>> unpredictable whether the SYNC flag will be sent with current packet (where
>> due to pending_jobs==1 already, can contain valid pages), or will be only
>> set for the next one (where there will have 0 real page).
>
> I have to think about this one.
> Decrease pending_jobs there if we are doing multiple jobs?
> But we still have the issue of the semaphore.
>
>> IMHO it'll be good to separate the sync task, then we can change
>> pending_jobs to bool.  Something like:
>>
>>   bool pending_send_page;
>>   bool pending_send_sync;
>
> current code:
>
>         qemu_mutex_lock(&p->mutex);
>         qemu_mutex_lock(&p->mutex);
>
>         if (p->pending_job) {
>             uint64_t packet_num = p->packet_num;
>             uint32_t flags;
>             p->normal_num = 0;
>
>             if (use_zero_copy_send) {
>                 p->iovs_num = 0;
>             } else {
>                 p->iovs_num = 1;
>             }
>
>             for (int i = 0; i < p->pages->num; i++) {
>                 p->normal[p->normal_num] = p->pages->offset[i];
>                 p->normal_num++;
>             }
>
>             if (p->normal_num) {
>                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
>                 if (ret != 0) {
>                     qemu_mutex_unlock(&p->mutex);
>                     break;
>                 }
>             }
>             multifd_send_fill_packet(p);
>             flags = p->flags;
>             p->flags = 0;
>             p->num_packets++;
>             p->total_normal_pages += p->normal_num;
>             p->pages->num = 0;
>             p->pages->block = NULL;
>             qemu_mutex_unlock(&p->mutex);
>
>             trace_multifd_send(p->id, packet_num, p->normal_num, flags,
>                                p->next_packet_size);
>
>             if (use_zero_copy_send) {
>                 /* Send header first, without zerocopy */
>                 ret = qio_channel_write_all(p->c, (void *)p->packet,
>                                             p->packet_len, &local_err);
>                 if (ret != 0) {
>                     break;
>                 }
>             } else {
>                 /* Send header using the same writev call */
>                 p->iov[0].iov_len = p->packet_len;
>                 p->iov[0].iov_base = p->packet;
>             }
>
>             ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
>                                               0, p->write_flags, &local_err);
>             if (ret != 0) {
>                 break;
>             }
>
>             stat64_add(&mig_stats.multifd_bytes,
>                        p->next_packet_size + p->packet_len);
>             stat64_add(&mig_stats.transferred,
>                        p->next_packet_size + p->packet_len);
>             p->next_packet_size = 0;
>             qemu_mutex_lock(&p->mutex);
>             p->pending_job--;
>             qemu_mutex_unlock(&p->mutex);
>
>             if (flags & MULTIFD_FLAG_SYNC) {
>                 qemu_sem_post(&p->sem_sync);
>             }
>         } else {
>             qemu_mutex_unlock(&p->mutex);
>             /* sometimes there are spurious wakeups */
>         }
>
> Your suggested change:
>
>        qemu_mutex_lock(&p->mutex);
>
>        if (p->pending_job_page) {

It's a semantic issue really, but I'd rather we avoid locking ourselves
more into the "pages" idea for multifd threads. The data being sent by
the multifd thread should be opaque.
Peter Xu Oct. 22, 2023, 8:17 p.m. UTC | #15
On Fri, Oct 20, 2023 at 09:48:54AM -0300, Fabiano Rosas wrote:
> > If pending_job == 0 -> owner of the channel is migration_thread and it
> > can use it.
> >
> > If pending_job > 0 -> owner of the channel is the channel thread and
> > migration_thread can't use it.
> 
> Do you really mean "migration_thread" here or just multifd_send_pages()?
> Because multifd_send_sync_main() doesn't care about this ownership
> rule. Does that mean that code is incorrect?

Yes, that's also what I was referring as the confusion, too.

[...]

> It's a semantic issue really, but I'd rather we avoid locking ourselves
> more into the "pages" idea for multifd threads. The data being sent by
> the multifd thread should be opaque.

I've put these ideas into a RFC patchset here:

[PATCH RFC 0/7] migration/multifd: quit unitifications and separate sync packet

I kept it "pending_job" there, avoid using "pages" as a name.

Fabiano, I have a patch there that dropped p->quit, so there will be
crossovers with your patchset.  I tried to leave that alone, but found I'd
better clean that up when add the send thread helpers.  Let's see how it
goes..

Thanks,
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index 0f6b203877..e26f5f246d 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -362,8 +362,6 @@  struct {
     MultiFDPages_t *pages;
     /* global number of generated multifd packets */
     uint64_t packet_num;
-    /* send channels ready */
-    QemuSemaphore channels_ready;
     /*
      * Have we already run terminate threads.  There is a race when it
      * happens that we got one error while we are exiting.
@@ -403,7 +401,6 @@  static int multifd_send_pages(QEMUFile *f)
         return -1;
     }
 
-    qemu_sem_wait(&multifd_send_state->channels_ready);
     /*
      * next_channel can remain from a previous migration that was
      * using more channels, so ensure it doesn't overflow if the
@@ -554,7 +551,6 @@  void multifd_save_cleanup(void)
             error_free(local_err);
         }
     }
-    qemu_sem_destroy(&multifd_send_state->channels_ready);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
     multifd_pages_clear(multifd_send_state->pages);
@@ -630,7 +626,6 @@  int multifd_send_sync_main(QEMUFile *f)
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
-        qemu_sem_wait(&multifd_send_state->channels_ready);
         trace_multifd_send_sync_main_wait(p->id);
         qemu_sem_wait(&p->sem_sync);
 
@@ -664,7 +659,6 @@  static void *multifd_send_thread(void *opaque)
     p->num_packets = 1;
 
     while (true) {
-        qemu_sem_post(&multifd_send_state->channels_ready);
         qemu_sem_wait(&p->sem);
 
         if (qatomic_read(&multifd_send_state->exiting)) {
@@ -759,7 +753,6 @@  out:
      */
     if (ret != 0) {
         qemu_sem_post(&p->sem_sync);
-        qemu_sem_post(&multifd_send_state->channels_ready);
     }
 
     qemu_mutex_lock(&p->mutex);
@@ -796,7 +789,6 @@  static void multifd_tls_outgoing_handshake(QIOTask *task,
          * is not created, and then tell who pay attention to me.
          */
         p->quit = true;
-        qemu_sem_post(&multifd_send_state->channels_ready);
         qemu_sem_post(&p->sem_sync);
     }
 }
@@ -874,7 +866,6 @@  static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
 {
      migrate_set_error(migrate_get_current(), err);
      /* Error happen, we need to tell who pay attention to me */
-     qemu_sem_post(&multifd_send_state->channels_ready);
      qemu_sem_post(&p->sem_sync);
      /*
       * Although multifd_send_thread is not created, but main migration
@@ -919,7 +910,6 @@  int multifd_save_setup(Error **errp)
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     multifd_send_state->pages = multifd_pages_init(page_count);
-    qemu_sem_init(&multifd_send_state->channels_ready, 0);
     qatomic_set(&multifd_send_state->exiting, 0);
     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];