diff mbox series

[RFC,v2,6/6] migration/multifd: Bring back the 'ready' semaphore

Message ID 20231012140651.13122-7-farosas@suse.de
State New
Headers show
Series migration/multifd: Locking changes | expand

Commit Message

Fabiano Rosas Oct. 12, 2023, 2:06 p.m. UTC
Bring back the 'ready' semaphore, but this time make it per-channel,
so that we can do true lockstep switching of MultiFDPages without
taking the channel lock.

Drop the channel lock as it now becomes useless. The rules for
accessing the MultiFDSendParams are:

- between sem and sem_done/ready if we're the channel

    qemu_sem_post(&p->ready);
    qemu_sem_wait(&p->sem);
    <owns p>
    qemu_sem_post(&p->sem_done);

- between ready and sem if we're not the channel

    qemu_sem_wait(&p->ready);
    <owns p>
    qemu_sem_post(&p->sem);

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
One issue I can see with this is that we might now have to wait at
multifd_send_pages() if a channel takes too long to send it's packet
and come back to p->ready. We would need to find a way of ignoring a
slow channel and skipping ahead to the next one in line.
---
 migration/multifd.c | 45 +++++++++++++--------------------------------
 migration/multifd.h |  5 ++---
 2 files changed, 15 insertions(+), 35 deletions(-)

Comments

Juan Quintela Oct. 19, 2023, 10:43 a.m. UTC | #1
Fabiano Rosas <farosas@suse.de> wrote:
> Bring back the 'ready' semaphore, but this time make it per-channel,
> so that we can do true lockstep switching of MultiFDPages without
> taking the channel lock.
>
> Drop the channel lock as it now becomes useless. The rules for
> accessing the MultiFDSendParams are:
>
> - between sem and sem_done/ready if we're the channel
>
>     qemu_sem_post(&p->ready);
>     qemu_sem_wait(&p->sem);
>     <owns p>
>     qemu_sem_post(&p->sem_done);
>
> - between ready and sem if we're not the channel
>
>     qemu_sem_wait(&p->ready);
>     <owns p>
>     qemu_sem_post(&p->sem);
>
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
> One issue I can see with this is that we might now have to wait at
> multifd_send_pages() if a channel takes too long to send it's packet
> and come back to p->ready. We would need to find a way of ignoring a
> slow channel and skipping ahead to the next one in line.

See my 1st patch in the series.  That is exactly what the channels_ready
sem does.  In your network is faster than your multifd channels can
write, main thread never waits on channels_ready, because it is always
positive.

In case that there are no channels ready, we wait in the sem instead of
being busy waiting.

And searching for the channel that is ready is really fast:

static int multifd_send_pages(QEMUFile *f)
{
    [...]

    qemu_sem_wait(&multifd_send_state->channels_ready);
    // taking a sem that is positive is basically 1 instruction

    next_channel %= migrate_multifd_channels();
    // we do crude load balancing here.
    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
        qemu_mutex_lock(&p->mutex);  // 1 lock instruction
        if (p->quit) {               // 1 check
            ....
        }
        if (!p->pending_job) {       // 1 check
            p->pending_job++;
            next_channel = (i + 1) % migrate_multifd_channels();
            break;
        }
        qemu_mutex_unlock(&p->mutex);  // 1 unlock (another instruction)
    }
    ....

So checking a channel is:
- taking a mutex that is almost always free.
- 2 checks
- unlock the mutex

So I can't really see the need to optimize this.

Notice that your scheme only has real advantanges if:
- all channels are busy
- the channel that becomes ready is just the previous one to
  next_channel or so

And in that case, I think that the solution is to have more channels or
faster networking.

Later, Juan.
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index b7ba3fe0e6..7fa7bc33fd 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -410,10 +410,10 @@  static int multifd_send_pages(QEMUFile *f)
     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
         p = &multifd_send_state->params[i];
 
-        qemu_mutex_lock(&p->mutex);
+        qemu_sem_wait(&p->ready);
+
         if (p->quit) {
             error_report("%s: channel %d has already quit!", __func__, i);
-            qemu_mutex_unlock(&p->mutex);
             return -1;
         }
         if (!p->pending_job) {
@@ -421,7 +421,6 @@  static int multifd_send_pages(QEMUFile *f)
             next_channel = (i + 1) % migrate_multifd_channels();
             break;
         }
-        qemu_mutex_unlock(&p->mutex);
     }
     assert(!p->pages->num);
     assert(!p->pages->block);
@@ -429,7 +428,6 @@  static int multifd_send_pages(QEMUFile *f)
     p->packet_num = multifd_send_state->packet_num++;
     multifd_send_state->pages = p->pages;
     p->pages = pages;
-    qemu_mutex_unlock(&p->mutex);
     qemu_sem_post(&p->sem);
 
     return 1;
@@ -529,9 +527,9 @@  void multifd_save_cleanup(void)
         }
         socket_send_channel_destroy(p->c);
         p->c = NULL;
-        qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
         qemu_sem_destroy(&p->sem_done);
+        qemu_sem_destroy(&p->ready);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -586,14 +584,12 @@  static void multifd_send_wait(void)
          * Even idle channels will wait for p->sem at the top of the
          * loop.
          */
+        qemu_sem_wait(&p->ready);
         qemu_sem_post(&p->sem);
 
         trace_multifd_send_wait(migrate_multifd_channels() - i);
         qemu_sem_wait(&p->sem_done);
-
-        qemu_mutex_lock(&p->mutex);
         assert(!p->pending_job || p->quit);
-        qemu_mutex_unlock(&p->mutex);
     }
 
     /*
@@ -621,20 +617,17 @@  int multifd_send_sync_main(QEMUFile *f)
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
+        qemu_sem_wait(&p->ready);
         trace_multifd_send_sync_main_signal(p->id);
 
-        qemu_mutex_lock(&p->mutex);
-
         if (p->quit) {
             error_report("%s: channel %d has already quit", __func__, i);
-            qemu_mutex_unlock(&p->mutex);
             return -1;
         }
 
         p->packet_num = multifd_send_state->packet_num++;
         p->flags |= MULTIFD_FLAG_SYNC;
         p->pending_job++;
-        qemu_mutex_unlock(&p->mutex);
         qemu_sem_post(&p->sem);
     }
 
@@ -685,15 +678,14 @@  static void *multifd_send_thread(void *opaque)
     p->num_packets = 1;
 
     while (true) {
+        qemu_sem_post(&p->ready);
         qemu_sem_wait(&p->sem);
 
         if (qatomic_read(&multifd_send_state->exiting)) {
-            qemu_mutex_lock(&p->mutex);
             p->quit = true;
-            qemu_mutex_unlock(&p->mutex);
+            qemu_sem_post(&p->sem_done);
             break;
         }
-        qemu_mutex_lock(&p->mutex);
 
         if (p->pending_job) {
             uint64_t packet_num = p->packet_num;
@@ -714,7 +706,6 @@  static void *multifd_send_thread(void *opaque)
             if (p->normal_num) {
                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
                 if (ret != 0) {
-                    qemu_mutex_unlock(&p->mutex);
                     break;
                 }
             }
@@ -725,7 +716,6 @@  static void *multifd_send_thread(void *opaque)
             p->total_normal_pages += p->normal_num;
             p->pages->num = 0;
             p->pages->block = NULL;
-            qemu_mutex_unlock(&p->mutex);
 
             trace_multifd_send(p->id, packet_num, p->normal_num, flags,
                                p->next_packet_size);
@@ -753,12 +743,9 @@  static void *multifd_send_thread(void *opaque)
 
             stat64_add(&mig_stats.multifd_bytes, p->next_packet_size);
             stat64_add(&mig_stats.transferred, p->next_packet_size);
-            qemu_mutex_lock(&p->mutex);
             p->pending_job--;
-            qemu_mutex_unlock(&p->mutex);
 
         } else {
-            qemu_mutex_unlock(&p->mutex);
             qemu_sem_post(&p->sem_done);
         }
     }
@@ -766,11 +753,8 @@  static void *multifd_send_thread(void *opaque)
 out:
     if (local_err) {
         trace_multifd_send_error(p->id);
-
-        qemu_mutex_lock(&p->mutex);
         p->quit = true;
-        qemu_mutex_unlock(&p->mutex);
-
+        qemu_sem_post(&p->ready);
         multifd_send_terminate_threads(local_err);
         error_free(local_err);
     }
@@ -780,12 +764,10 @@  out:
      * who pay attention to me.
      */
     if (ret != 0) {
-        qemu_sem_post(&p->sem_done);
+        p->quit = true;
+        qemu_sem_post(&p->ready);
     }
-
-    qemu_mutex_lock(&p->mutex);
     p->running = false;
-    qemu_mutex_unlock(&p->mutex);
 
     rcu_unregister_thread();
     migration_threads_remove(thread);
@@ -817,7 +799,7 @@  static void multifd_tls_outgoing_handshake(QIOTask *task,
          * is not created, and then tell who pay attention to me.
          */
         p->quit = true;
-        qemu_sem_post(&p->sem_done);
+        qemu_sem_post(&p->ready);
     }
 }
 
@@ -893,14 +875,13 @@  static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
                                              QIOChannel *ioc, Error *err)
 {
      migrate_set_error(migrate_get_current(), err);
-     /* Error happen, we need to tell who pay attention to me */
-     qemu_sem_post(&p->sem_done);
      /*
       * Although multifd_send_thread is not created, but main migration
       * thread need to judge whether it is running, so we need to mark
       * its status.
       */
      p->quit = true;
+     qemu_sem_post(&p->ready);
      object_unref(OBJECT(ioc));
      error_free(err);
 }
@@ -944,9 +925,9 @@  int multifd_save_setup(Error **errp)
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
-        qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_done, 0);
+        qemu_sem_init(&p->ready, 0);
         p->quit = false;
         p->pending_job = 0;
         p->id = i;
diff --git a/migration/multifd.h b/migration/multifd.h
index 71bd66974d..6bb10b07aa 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -93,8 +93,8 @@  typedef struct {
     /* channel is done transmitting until more pages are queued */
     QemuSemaphore sem_done;
 
-    /* this mutex protects the following parameters */
-    QemuMutex mutex;
+    QemuSemaphore ready;
+
     /* is this channel thread running */
     bool running;
     /* should this thread finish */
@@ -209,4 +209,3 @@  typedef struct {
 void multifd_register_ops(int method, MultiFDMethods *ops);
 
 #endif
-