Message ID | 1360950433-17106-23-git-send-email-pbonzini@redhat.com |
---|---|
State | New |
Headers | show |
On 02/15/2013 07:46 PM, Paolo Bonzini wrote: > Buffering was needed because blocking writes could take a long time > and starve other threads seeking to grab the big QEMU mutex. > > Now that all writes (except within _complete callbacks) are done > outside the big QEMU mutex, we do not need buffering at all. > > Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> > --- > include/migration/migration.h | 3 -- > migration.c | 78 ++++++++++------------------------------ > savevm.c | 1 + > 3 files changed, 21 insertions(+), 61 deletions(-) > > diff --git a/include/migration/migration.h b/include/migration/migration.h > index d78bbbb..172ef95 100644 > --- a/include/migration/migration.h > +++ b/include/migration/migration.h > @@ -34,9 +34,6 @@ struct MigrationState > int64_t bandwidth_limit; > size_t bytes_xfer; > size_t xfer_limit; > - uint8_t *buffer; > - size_t buffer_size; > - size_t buffer_capacity; > QemuThread thread; > QEMUBH *cleanup_bh; > > diff --git a/migration.c b/migration.c > index d6a7dff..1f6fbdc 100644 > --- a/migration.c > +++ b/migration.c > @@ -503,73 +503,41 @@ int64_t migrate_xbzrle_cache_size(void) > > /* migration thread support */ > > - > -static void buffered_flush(MigrationState *s) > -{ > - size_t offset = 0; > - ssize_t ret = 0; > - > - DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size); > - > - if (qemu_file_get_error(s->file)) { > - s->buffer_size = 0; > - return; > - } > - qemu_fflush(s->file); > - > - while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) { > - size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit - s->bytes_xfer); > - ret = migrate_fd_put_buffer(s, s->buffer + offset, to_send); > - if (ret <= 0) { > - DPRINTF("error flushing data, %zd\n", ret); > - break; > - } else { > - DPRINTF("flushed %zd byte(s)\n", ret); > - offset += ret; > - s->bytes_xfer += ret; > - } > - } > - > - DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size); > - memmove(s->buffer, s->buffer + offset, s->buffer_size - offset); > - s->buffer_size -= offset; > - > - if (ret < 0) { > - qemu_file_set_error(s->file, ret); > - } > -} > - > static int buffered_put_buffer(void *opaque, const uint8_t *buf, > int64_t pos, int size) > { > MigrationState *s = opaque; > - ssize_t error; > + ssize_t ret; > + size_t sent; > > DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos); > > - error = qemu_file_get_error(s->file); > - if (error) { > + ret = qemu_file_get_error(s->file); > + if (ret) { > DPRINTF("flush when error, bailing: %s\n", strerror(-error)); you need to replace error with ret here. > - return error; > + return ret; > } > > if (size <= 0) { > return size; > } > > - if (size > (s->buffer_capacity - s->buffer_size)) { > - DPRINTF("increasing buffer capacity from %zu by %zu\n", > - s->buffer_capacity, size + 1024); > - > - s->buffer_capacity += size + 1024; > - > - s->buffer = g_realloc(s->buffer, s->buffer_capacity); > + sent = 0; > + while (size) { > + ret = migrate_fd_put_buffer(s, buf, size); > + if (ret <= 0) { > + DPRINTF("error flushing data, %zd\n", ret); > + return ret; > + } else { > + DPRINTF("flushed %zd byte(s)\n", ret); > + sent += ret; > + buf += ret; > + size -= ret; > + s->bytes_xfer += ret; > + } > } > > - memcpy(s->buffer + s->buffer_size, buf, size); > - s->buffer_size += size; > - > - return size; > + return sent; > } > > static int buffered_close(void *opaque) > @@ -691,10 +659,9 @@ static void *buffered_file_thread(void *opaque) > /* usleep expects microseconds */ > g_usleep((initial_time + BUFFER_DELAY - current_time)*1000); > } > - buffered_flush(s); > if (qemu_file_get_error(s->file)) { > __sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE, MIG_STATE_ERROR); > - } else if (last_round && s->buffer_size == 0) { > + } else if (last_round) { > __sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE, MIG_STATE_COMPLETED); > } > } > @@ -714,7 +681,6 @@ static void *buffered_file_thread(void *opaque) > qemu_bh_schedule(s->cleanup_bh); > qemu_mutex_unlock_iothread(); > > - g_free(s->buffer); > return NULL; > } > > @@ -731,10 +697,6 @@ void migrate_fd_connect(MigrationState *s) > { > s->state = MIG_STATE_ACTIVE; > s->bytes_xfer = 0; > - s->buffer = NULL; > - s->buffer_size = 0; > - s->buffer_capacity = 0; > - > s->xfer_limit = s->bandwidth_limit / XFER_LIMIT_RATIO; > > s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s); > diff --git a/savevm.c b/savevm.c > index 7c7774e..ce10295 100644 > --- a/savevm.c > +++ b/savevm.c > @@ -1724,6 +1724,7 @@ void qemu_savevm_state_complete(QEMUFile *f) > } > > qemu_put_byte(f, QEMU_VM_EOF); > + qemu_fflush(f); > } > > uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size) > At last :) Reviewed-by: Orit Wasserman <owasserm@redhat.com>
Paolo Bonzini <pbonzini@redhat.com> wrote: > Buffering was needed because blocking writes could take a long time > and starve other threads seeking to grab the big QEMU mutex. > > Now that all writes (except within _complete callbacks) are done > outside the big QEMU mutex, we do not need buffering at all. > > Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> Reviewed-by: Juan Quintela <quintela@redhat.com> DEBUG error change was already found by Orit.
diff --git a/include/migration/migration.h b/include/migration/migration.h index d78bbbb..172ef95 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -34,9 +34,6 @@ struct MigrationState int64_t bandwidth_limit; size_t bytes_xfer; size_t xfer_limit; - uint8_t *buffer; - size_t buffer_size; - size_t buffer_capacity; QemuThread thread; QEMUBH *cleanup_bh; diff --git a/migration.c b/migration.c index d6a7dff..1f6fbdc 100644 --- a/migration.c +++ b/migration.c @@ -503,73 +503,41 @@ int64_t migrate_xbzrle_cache_size(void) /* migration thread support */ - -static void buffered_flush(MigrationState *s) -{ - size_t offset = 0; - ssize_t ret = 0; - - DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size); - - if (qemu_file_get_error(s->file)) { - s->buffer_size = 0; - return; - } - qemu_fflush(s->file); - - while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) { - size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit - s->bytes_xfer); - ret = migrate_fd_put_buffer(s, s->buffer + offset, to_send); - if (ret <= 0) { - DPRINTF("error flushing data, %zd\n", ret); - break; - } else { - DPRINTF("flushed %zd byte(s)\n", ret); - offset += ret; - s->bytes_xfer += ret; - } - } - - DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size); - memmove(s->buffer, s->buffer + offset, s->buffer_size - offset); - s->buffer_size -= offset; - - if (ret < 0) { - qemu_file_set_error(s->file, ret); - } -} - static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size) { MigrationState *s = opaque; - ssize_t error; + ssize_t ret; + size_t sent; DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos); - error = qemu_file_get_error(s->file); - if (error) { + ret = qemu_file_get_error(s->file); + if (ret) { DPRINTF("flush when error, bailing: %s\n", strerror(-error)); - return error; + return ret; } if (size <= 0) { return size; } - if (size > (s->buffer_capacity - s->buffer_size)) { - DPRINTF("increasing buffer capacity from %zu by %zu\n", - s->buffer_capacity, size + 1024); - - s->buffer_capacity += size + 1024; - - s->buffer = g_realloc(s->buffer, s->buffer_capacity); + sent = 0; + while (size) { + ret = migrate_fd_put_buffer(s, buf, size); + if (ret <= 0) { + DPRINTF("error flushing data, %zd\n", ret); + return ret; + } else { + DPRINTF("flushed %zd byte(s)\n", ret); + sent += ret; + buf += ret; + size -= ret; + s->bytes_xfer += ret; + } } - memcpy(s->buffer + s->buffer_size, buf, size); - s->buffer_size += size; - - return size; + return sent; } static int buffered_close(void *opaque) @@ -691,10 +659,9 @@ static void *buffered_file_thread(void *opaque) /* usleep expects microseconds */ g_usleep((initial_time + BUFFER_DELAY - current_time)*1000); } - buffered_flush(s); if (qemu_file_get_error(s->file)) { __sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE, MIG_STATE_ERROR); - } else if (last_round && s->buffer_size == 0) { + } else if (last_round) { __sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE, MIG_STATE_COMPLETED); } } @@ -714,7 +681,6 @@ static void *buffered_file_thread(void *opaque) qemu_bh_schedule(s->cleanup_bh); qemu_mutex_unlock_iothread(); - g_free(s->buffer); return NULL; } @@ -731,10 +697,6 @@ void migrate_fd_connect(MigrationState *s) { s->state = MIG_STATE_ACTIVE; s->bytes_xfer = 0; - s->buffer = NULL; - s->buffer_size = 0; - s->buffer_capacity = 0; - s->xfer_limit = s->bandwidth_limit / XFER_LIMIT_RATIO; s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s); diff --git a/savevm.c b/savevm.c index 7c7774e..ce10295 100644 --- a/savevm.c +++ b/savevm.c @@ -1724,6 +1724,7 @@ void qemu_savevm_state_complete(QEMUFile *f) } qemu_put_byte(f, QEMU_VM_EOF); + qemu_fflush(f); } uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size)
Buffering was needed because blocking writes could take a long time and starve other threads seeking to grab the big QEMU mutex. Now that all writes (except within _complete callbacks) are done outside the big QEMU mutex, we do not need buffering at all. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> --- include/migration/migration.h | 3 -- migration.c | 78 ++++++++++------------------------------ savevm.c | 1 + 3 files changed, 21 insertions(+), 61 deletions(-)