diff mbox series

[v12,17/21] migration: Create ram_multifd_page

Message ID 20180425112723.1111-18-quintela@redhat.com
State New
Headers show
Series Multifd | expand

Commit Message

Juan Quintela April 25, 2018, 11:27 a.m. UTC
The function still don't use multifd, but we have simplified
ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
counter.

Signed-off-by: Juan Quintela <quintela@redhat.com>

--
Add last_page parameter
Add commets for done and address
Remove multifd field, it is the same than normal pages
Merge next patch, now we send multiple pages at a time
Remove counter for multifd pages, it is identical to normal pages
Use iovec's instead of creating the equivalent.
Clear memory used by pages (dave)
Use g_new0(danp)
define MULTIFD_CONTINUE
now pages member is a pointer
Fix off-by-one in number of pages in one packet
Remove RAM_SAVE_FLAG_MULTIFD_PAGE
s/multifd_pages_t/MultiFDPages_t/
---
 migration/ram.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 93 insertions(+)

Comments

Peter Xu April 26, 2018, 7:43 a.m. UTC | #1
On Wed, Apr 25, 2018 at 01:27:19PM +0200, Juan Quintela wrote:

[...]

> +static void multifd_queue_page(RAMBlock *block, ram_addr_t offset)
> +{
> +    MultiFDPages_t *pages = multifd_send_state->pages;
> +
> +    if (!pages->block) {
> +        pages->block = block;
> +    }
> +
> +    if (pages->block == block) {
> +        pages->offset[pages->used] = offset;
> +        pages->iov[pages->used].iov_base = block->host + offset;
> +        pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
> +        pages->used++;
> +
> +        if (pages->used < pages->allocated) {
> +            return;
> +        }
> +    }
> +
> +    multifd_send_pages();
> +
> +    if (pages->block != block) {
> +        multifd_queue_page(block, offset);

Nits: could we avoid the recursive call here?  E.g.:

multifd_queue_page()
{
  /* flush pages if necessary */
  if (pages->block && (pages->block != block ||
                       pages->used >= pages->allocated))
    multifd_send_pages();

  if (!pages->block)
    pages->block = block;

  pages->offset[pages->used] = ...
  pages->iov[pages->used].iov_base = ...
  pages->iov[pages->used].iov_len = ...
  pages->used++;
}

> +    }
> +}
> +
>  static void multifd_send_terminate_threads(Error *err)
>  {
>      int i;
> @@ -746,6 +804,7 @@ int multifd_save_cleanup(Error **errp)
>          g_free(p->packet);
>          p->packet = NULL;
>      }
> +    qemu_sem_destroy(&multifd_send_state->channels_ready);
>      qemu_sem_destroy(&multifd_send_state->sem_sync);
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
> @@ -763,12 +822,17 @@ static void multifd_send_sync_main(void)
>      if (!migrate_use_multifd()) {
>          return;
>      }
> +    if (multifd_send_state->pages->used) {
> +        multifd_send_pages();
> +    }
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
>          trace_multifd_send_sync_main_signal(p->id);
>  
>          qemu_mutex_lock(&p->mutex);
> +        multifd_send_state->seq++;
> +        p->seq = multifd_send_state->seq;
>          p->flags |= MULTIFD_FLAG_SYNC;
>          p->pending_job++;
>          qemu_mutex_unlock(&p->mutex);
> @@ -824,6 +888,7 @@ static void *multifd_send_thread(void *opaque)
>              if (flags & MULTIFD_FLAG_SYNC) {
>                  qemu_sem_post(&multifd_send_state->sem_sync);
>              }
> +            qemu_sem_post(&multifd_send_state->channels_ready);
>          } else if (p->quit) {
>              qemu_mutex_unlock(&p->mutex);
>              break;
> @@ -883,6 +948,7 @@ int multifd_save_setup(void)
>      atomic_set(&multifd_send_state->count, 0);
>      multifd_pages_init(&multifd_send_state->pages, page_count);
>      qemu_sem_init(&multifd_send_state->sem_sync, 0);
> +    qemu_sem_init(&multifd_send_state->channels_ready, 0);
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
> @@ -1576,6 +1642,31 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
>      return pages;
>  }
>  
> +static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
> +                            bool last_stage)
> +{

Maybe name it as ram_send_multifd_page()? :)

So that it can be aligned with the rest like ram_save_page() and
ram_save_compressed_page().

> +    int pages;
> +    uint8_t *p;
> +    RAMBlock *block = pss->block;
> +    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
> +
> +    p = block->host + offset;
> +
> +    pages = save_zero_page(rs, block, offset);

I suspect the series will need to rebase to Dave's next pull request
(or after its merging) since Guangrong's work should have refactored
this part in the coming pull request...

Thanks,

> +    if (pages == -1) {
> +        ram_counters.transferred +=
> +            save_page_header(rs, rs->f, block,
> +                             offset | RAM_SAVE_FLAG_PAGE);
> +        multifd_queue_page(block, offset);
> +        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
> +        ram_counters.transferred += TARGET_PAGE_SIZE;
> +        pages = 1;
> +        ram_counters.normal++;
> +    }
> +
> +    return pages;
> +}
> +
>  static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
>                                  ram_addr_t offset)
>  {
> @@ -2004,6 +2095,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
>          if (migrate_use_compression() &&
>              (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
>              res = ram_save_compressed_page(rs, pss, last_stage);
> +        } else if (migrate_use_multifd()) {
> +            res = ram_multifd_page(rs, pss, last_stage);
>          } else {
>              res = ram_save_page(rs, pss, last_stage);
>          }
> -- 
> 2.17.0
>
Peter Xu April 26, 2018, 8:18 a.m. UTC | #2
On Wed, Apr 25, 2018 at 01:27:19PM +0200, Juan Quintela wrote:
> The function still don't use multifd, but we have simplified
> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> counter.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> 
> --
> Add last_page parameter
> Add commets for done and address
> Remove multifd field, it is the same than normal pages
> Merge next patch, now we send multiple pages at a time
> Remove counter for multifd pages, it is identical to normal pages
> Use iovec's instead of creating the equivalent.
> Clear memory used by pages (dave)
> Use g_new0(danp)
> define MULTIFD_CONTINUE
> now pages member is a pointer
> Fix off-by-one in number of pages in one packet
> Remove RAM_SAVE_FLAG_MULTIFD_PAGE
> s/multifd_pages_t/MultiFDPages_t/
> ---
>  migration/ram.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 93 insertions(+)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 398cb0af3b..862ec53d32 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -54,6 +54,7 @@
>  #include "migration/block.h"
>  #include "sysemu/sysemu.h"
>  #include "qemu/uuid.h"
> +#include "qemu/iov.h"
>  
>  /***********************************************************/
>  /* ram save/restore */
> @@ -692,8 +693,65 @@ struct {
>      QemuSemaphore sem_sync;
>      /* global number of generated multifd packets */
>      uint32_t seq;
> +    /* send channels ready */
> +    QemuSemaphore channels_ready;
>  } *multifd_send_state;
>  
> +static void multifd_send_pages(void)
> +{
> +    int i;
> +    static int next_channel;
> +    MultiFDSendParams *p = NULL; /* make happy gcc */
> +    MultiFDPages_t *pages = multifd_send_state->pages;
> +
> +    qemu_sem_wait(&multifd_send_state->channels_ready);

This sem is posted when a thread has finished its work.  However this
is called in the main migration thread.  If with this line, are the
threads really sending things in parallel?  Since it looks to me that
this function (and the main thread) won't send the 2nd page array if
the 1st hasn't finished, and won't send the 3rd if the 2nd hasn't,
vice versa...

Maybe I misunderstood something.  Please feel free to correct me.

> +    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
> +        p = &multifd_send_state->params[i];
> +
> +        qemu_mutex_lock(&p->mutex);
> +        if (!p->pending_job) {
> +            p->pending_job++;
> +            next_channel = (i + 1) % migrate_multifd_channels();
> +            break;
> +        }
> +        qemu_mutex_unlock(&p->mutex);
> +    }
> +    p->pages->used = 0;
> +    multifd_send_state->seq++;
> +    p->seq = multifd_send_state->seq;
> +    p->pages->block = NULL;
> +    multifd_send_state->pages = p->pages;
> +    p->pages = pages;

Here we directly replaced MultiFDSendParams.pages with
multifd_send_state->pages.  Then are we always using a single
MultiFDPages_t struct?  And if so, will all the initial
MultiFDSendParams.pages memory leaked without freed?

> +    qemu_mutex_unlock(&p->mutex);
> +    qemu_sem_post(&p->sem);
> +}

Thanks,
Dr. David Alan Gilbert May 3, 2018, 11:30 a.m. UTC | #3
* Peter Xu (peterx@redhat.com) wrote:
> On Wed, Apr 25, 2018 at 01:27:19PM +0200, Juan Quintela wrote:
> > The function still don't use multifd, but we have simplified
> > ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
> > counter.
> > 
> > Signed-off-by: Juan Quintela <quintela@redhat.com>
> > 
> > --
> > Add last_page parameter
> > Add commets for done and address
> > Remove multifd field, it is the same than normal pages
> > Merge next patch, now we send multiple pages at a time
> > Remove counter for multifd pages, it is identical to normal pages
> > Use iovec's instead of creating the equivalent.
> > Clear memory used by pages (dave)
> > Use g_new0(danp)
> > define MULTIFD_CONTINUE
> > now pages member is a pointer
> > Fix off-by-one in number of pages in one packet
> > Remove RAM_SAVE_FLAG_MULTIFD_PAGE
> > s/multifd_pages_t/MultiFDPages_t/
> > ---
> >  migration/ram.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++
> >  1 file changed, 93 insertions(+)
> > 
> > diff --git a/migration/ram.c b/migration/ram.c
> > index 398cb0af3b..862ec53d32 100644
> > --- a/migration/ram.c
> > +++ b/migration/ram.c
> > @@ -54,6 +54,7 @@
> >  #include "migration/block.h"
> >  #include "sysemu/sysemu.h"
> >  #include "qemu/uuid.h"
> > +#include "qemu/iov.h"
> >  
> >  /***********************************************************/
> >  /* ram save/restore */
> > @@ -692,8 +693,65 @@ struct {
> >      QemuSemaphore sem_sync;
> >      /* global number of generated multifd packets */
> >      uint32_t seq;
> > +    /* send channels ready */
> > +    QemuSemaphore channels_ready;
> >  } *multifd_send_state;
> >  
> > +static void multifd_send_pages(void)
> > +{
> > +    int i;
> > +    static int next_channel;
> > +    MultiFDSendParams *p = NULL; /* make happy gcc */
> > +    MultiFDPages_t *pages = multifd_send_state->pages;
> > +
> > +    qemu_sem_wait(&multifd_send_state->channels_ready);
> 
> This sem is posted when a thread has finished its work.  However this
> is called in the main migration thread.  If with this line, are the
> threads really sending things in parallel?  Since it looks to me that
> this function (and the main thread) won't send the 2nd page array if
> the 1st hasn't finished, and won't send the 3rd if the 2nd hasn't,
> vice versa...
> 
> Maybe I misunderstood something.  Please feel free to correct me.

I share a similar misunderstanding;  except I can't understand how the
first item ever gets sent if we're waiting for channels_ready.
I think I could have understood it if there was a sem_post at the top of
multifd_send_thread.

Dave

> > +    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
> > +        p = &multifd_send_state->params[i];
> > +
> > +        qemu_mutex_lock(&p->mutex);
> > +        if (!p->pending_job) {
> > +            p->pending_job++;
> > +            next_channel = (i + 1) % migrate_multifd_channels();
> > +            break;
> > +        }
> > +        qemu_mutex_unlock(&p->mutex);
> > +    }
> > +    p->pages->used = 0;
> > +    multifd_send_state->seq++;
> > +    p->seq = multifd_send_state->seq;
> > +    p->pages->block = NULL;
> > +    multifd_send_state->pages = p->pages;
> > +    p->pages = pages;
> 
> Here we directly replaced MultiFDSendParams.pages with
> multifd_send_state->pages.  Then are we always using a single
> MultiFDPages_t struct?  And if so, will all the initial
> MultiFDSendParams.pages memory leaked without freed?
> 
> > +    qemu_mutex_unlock(&p->mutex);
> > +    qemu_sem_post(&p->sem);
> > +}
> 
> Thanks,
> 
> -- 
> Peter Xu
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Juan Quintela May 23, 2018, 11:13 a.m. UTC | #4
Peter Xu <peterx@redhat.com> wrote:
> On Wed, Apr 25, 2018 at 01:27:19PM +0200, Juan Quintela wrote:
>> The function still don't use multifd, but we have simplified
>> ram_save_page, xbzrle and RDMA stuff is gone.  We have added a new
>> counter.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> 
>> --
>> Add last_page parameter
>> Add commets for done and address
>> Remove multifd field, it is the same than normal pages
>> Merge next patch, now we send multiple pages at a time
>> Remove counter for multifd pages, it is identical to normal pages
>> Use iovec's instead of creating the equivalent.
>> Clear memory used by pages (dave)
>> Use g_new0(danp)
>> define MULTIFD_CONTINUE
>> now pages member is a pointer
>> Fix off-by-one in number of pages in one packet
>> Remove RAM_SAVE_FLAG_MULTIFD_PAGE
>> s/multifd_pages_t/MultiFDPages_t/
>> ---
>>  migration/ram.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++
>>  1 file changed, 93 insertions(+)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 398cb0af3b..862ec53d32 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -54,6 +54,7 @@
>>  #include "migration/block.h"
>>  #include "sysemu/sysemu.h"
>>  #include "qemu/uuid.h"
>> +#include "qemu/iov.h"
>>  
>>  /***********************************************************/
>>  /* ram save/restore */
>> @@ -692,8 +693,65 @@ struct {
>>      QemuSemaphore sem_sync;
>>      /* global number of generated multifd packets */
>>      uint32_t seq;
>> +    /* send channels ready */
>> +    QemuSemaphore channels_ready;
>>  } *multifd_send_state;
>>  
>> +static void multifd_send_pages(void)
>> +{
>> +    int i;
>> +    static int next_channel;
>> +    MultiFDSendParams *p = NULL; /* make happy gcc */
>> +    MultiFDPages_t *pages = multifd_send_state->pages;
>> +
>> +    qemu_sem_wait(&multifd_send_state->channels_ready);
>
> This sem is posted when a thread has finished its work.  However this
> is called in the main migration thread.  If with this line, are the
> threads really sending things in parallel?  Since it looks to me that
> this function (and the main thread) won't send the 2nd page array if
> the 1st hasn't finished, and won't send the 3rd if the 2nd hasn't,
> vice versa...
>
> Maybe I misunderstood something.  Please feel free to correct me.

@@ -824,6 +888,7 @@ static void *multifd_send_thread(void *opaque)
             if (flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&multifd_send_state->sem_sync);
             }
+            qemu_sem_post(&multifd_send_state->channels_ready);
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;


Notice this bit on multifd_send_thread.  We are adding one to channels_ready.
But we can enter there for two reasons:
- we need to send a new packet full of pages
- we need to send a syncronization packet.
  this is what happens when we start.

Before the main thread start, all the other channels have to be created,
so we have that semaphore initialized to the right number of channels.

How do you preffer this to be documented?

Later, Juan.



>> +    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
>> +        p = &multifd_send_state->params[i];
>> +
>> +        qemu_mutex_lock(&p->mutex);
>> +        if (!p->pending_job) {
>> +            p->pending_job++;
>> +            next_channel = (i + 1) % migrate_multifd_channels();
>> +            break;
>> +        }
>> +        qemu_mutex_unlock(&p->mutex);
>> +    }
>> +    p->pages->used = 0;
>> +    multifd_send_state->seq++;
>> +    p->seq = multifd_send_state->seq;
>> +    p->pages->block = NULL;
>> +    multifd_send_state->pages = p->pages;
>> +    p->pages = pages;

[1]

>
> Here we directly replaced MultiFDSendParams.pages with
> multifd_send_state->pages.  Then are we always using a single
> MultiFDPages_t struct?  And if so, will all the initial
> MultiFDSendParams.pages memory leaked without freed?

Multifdsend_state_pages is stored in pages variable.
We stored it at [1].

What we have (sending side) is:

- 1 multifd_pages by channel
- 1 multifd_pages by main thread

What we do here is:

  pages = multifd_send_state->pages;
  multifd_send_state->pages = channel[i]->pages;
  channel[i]->pages = pages;

So we are just doing a swap.  We do that through the whole loop to have
smaller names (ii.e. basically we do everything over pages->foo), buht
the idea is what I put there. (Ok, what I called channel[i] is "p").

But everywhere on that file (compression threads and multifd ones) use p
to mean the parameters of a thread.

Later, Juan.
diff mbox series

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 398cb0af3b..862ec53d32 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -54,6 +54,7 @@ 
 #include "migration/block.h"
 #include "sysemu/sysemu.h"
 #include "qemu/uuid.h"
+#include "qemu/iov.h"
 
 /***********************************************************/
 /* ram save/restore */
@@ -692,8 +693,65 @@  struct {
     QemuSemaphore sem_sync;
     /* global number of generated multifd packets */
     uint32_t seq;
+    /* send channels ready */
+    QemuSemaphore channels_ready;
 } *multifd_send_state;
 
+static void multifd_send_pages(void)
+{
+    int i;
+    static int next_channel;
+    MultiFDSendParams *p = NULL; /* make happy gcc */
+    MultiFDPages_t *pages = multifd_send_state->pages;
+
+    qemu_sem_wait(&multifd_send_state->channels_ready);
+    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
+        p = &multifd_send_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        if (!p->pending_job) {
+            p->pending_job++;
+            next_channel = (i + 1) % migrate_multifd_channels();
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+    }
+    p->pages->used = 0;
+    multifd_send_state->seq++;
+    p->seq = multifd_send_state->seq;
+    p->pages->block = NULL;
+    multifd_send_state->pages = p->pages;
+    p->pages = pages;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+}
+
+static void multifd_queue_page(RAMBlock *block, ram_addr_t offset)
+{
+    MultiFDPages_t *pages = multifd_send_state->pages;
+
+    if (!pages->block) {
+        pages->block = block;
+    }
+
+    if (pages->block == block) {
+        pages->offset[pages->used] = offset;
+        pages->iov[pages->used].iov_base = block->host + offset;
+        pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
+        pages->used++;
+
+        if (pages->used < pages->allocated) {
+            return;
+        }
+    }
+
+    multifd_send_pages();
+
+    if (pages->block != block) {
+        multifd_queue_page(block, offset);
+    }
+}
+
 static void multifd_send_terminate_threads(Error *err)
 {
     int i;
@@ -746,6 +804,7 @@  int multifd_save_cleanup(Error **errp)
         g_free(p->packet);
         p->packet = NULL;
     }
+    qemu_sem_destroy(&multifd_send_state->channels_ready);
     qemu_sem_destroy(&multifd_send_state->sem_sync);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
@@ -763,12 +822,17 @@  static void multifd_send_sync_main(void)
     if (!migrate_use_multifd()) {
         return;
     }
+    if (multifd_send_state->pages->used) {
+        multifd_send_pages();
+    }
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         trace_multifd_send_sync_main_signal(p->id);
 
         qemu_mutex_lock(&p->mutex);
+        multifd_send_state->seq++;
+        p->seq = multifd_send_state->seq;
         p->flags |= MULTIFD_FLAG_SYNC;
         p->pending_job++;
         qemu_mutex_unlock(&p->mutex);
@@ -824,6 +888,7 @@  static void *multifd_send_thread(void *opaque)
             if (flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&multifd_send_state->sem_sync);
             }
+            qemu_sem_post(&multifd_send_state->channels_ready);
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
@@ -883,6 +948,7 @@  int multifd_save_setup(void)
     atomic_set(&multifd_send_state->count, 0);
     multifd_pages_init(&multifd_send_state->pages, page_count);
     qemu_sem_init(&multifd_send_state->sem_sync, 0);
+    qemu_sem_init(&multifd_send_state->channels_ready, 0);
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -1576,6 +1642,31 @@  static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
     return pages;
 }
 
+static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss,
+                            bool last_stage)
+{
+    int pages;
+    uint8_t *p;
+    RAMBlock *block = pss->block;
+    ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
+
+    p = block->host + offset;
+
+    pages = save_zero_page(rs, block, offset);
+    if (pages == -1) {
+        ram_counters.transferred +=
+            save_page_header(rs, rs->f, block,
+                             offset | RAM_SAVE_FLAG_PAGE);
+        multifd_queue_page(block, offset);
+        qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
+        ram_counters.transferred += TARGET_PAGE_SIZE;
+        pages = 1;
+        ram_counters.normal++;
+    }
+
+    return pages;
+}
+
 static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
                                 ram_addr_t offset)
 {
@@ -2004,6 +2095,8 @@  static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
         if (migrate_use_compression() &&
             (rs->ram_bulk_stage || !migrate_use_xbzrle())) {
             res = ram_save_compressed_page(rs, pss, last_stage);
+        } else if (migrate_use_multifd()) {
+            res = ram_multifd_page(rs, pss, last_stage);
         } else {
             res = ram_save_page(rs, pss, last_stage);
         }