@@ -533,20 +533,37 @@ struct MultiFDRecvParams {
QemuThread thread;
QemuCond cond;
QemuMutex mutex;
- bool quit;
int s;
+ /* proteced by param mutex */
+ bool quit;
+ uint8_t *address;
+ /* proteced by multifd mutex */
+ bool done;
};
typedef struct MultiFDRecvParams MultiFDRecvParams;
static MultiFDRecvParams *multifd_recv;
+QemuMutex multifd_recv_mutex;
+QemuCond multifd_recv_cond;
+
static void *multifd_recv_thread(void *opaque)
{
- MultiFDSendParams *params = opaque;
+ MultiFDRecvParams *params = opaque;
qemu_mutex_lock(¶ms->mutex);
while (!params->quit){
- qemu_cond_wait(¶ms->cond, ¶ms->mutex);
+ if (params->address) {
+ params->address = 0;
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_mutex_lock(&multifd_recv_mutex);
+ params->done = true;
+ qemu_cond_signal(&multifd_recv_cond);
+ qemu_mutex_unlock(&multifd_recv_mutex);
+ qemu_mutex_lock(¶ms->mutex);
+ } else {
+ qemu_cond_wait(¶ms->cond, ¶ms->mutex);
+ }
}
qemu_mutex_unlock(¶ms->mutex);
@@ -598,7 +615,9 @@ void migrate_multifd_recv_threads_create(void)
qemu_mutex_init(&multifd_recv[i].mutex);
qemu_cond_init(&multifd_recv[i].cond);
multifd_recv[i].quit = false;
+ multifd_recv[i].done = true;
multifd_recv[i].s = tcp_recv_channel_create();
+ multifd_recv[i].address = 0;
if(multifd_recv[i].s < 0) {
printf("Error creating a recv channel");
@@ -610,6 +629,27 @@ void migrate_multifd_recv_threads_create(void)
}
}
+static void multifd_recv_page(uint8_t *address, int fd_num)
+{
+ int thread_count;
+ MultiFDRecvParams *params;
+
+ thread_count = migrate_multifd_threads();
+ assert(fd_num < thread_count);
+ params = &multifd_recv[fd_num];
+
+ qemu_mutex_lock(&multifd_recv_mutex);
+ while (!params->done) {
+ qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex);
+ }
+ params->done = false;
+ qemu_mutex_unlock(&multifd_recv_mutex);
+ qemu_mutex_lock(¶ms->mutex);
+ params->address = address;
+ qemu_cond_signal(¶ms->cond);
+ qemu_mutex_unlock(¶ms->mutex);
+}
+
/**
* save_page_header: Write page header to wire
*
@@ -2785,10 +2825,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
case RAM_SAVE_FLAG_MULTIFD_PAGE:
fd_num = qemu_get_be16(f);
- if (fd_num == fd_num) {
- /* this is yet an unused variable, changed later */
- fd_num = 0;
- }
+ multifd_recv_page(host, fd_num);
qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
break;
We make the locking and the transfer of information specific, even if we are still receiving things through the main thread. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/ram.c | 51 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 7 deletions(-)