On 02/15/2013 07:46 PM, Paolo Bonzini wrote:
> Buffering was needed because blocking writes could take a long time
> and starve other threads seeking to grab the big QEMU mutex.
>
> Now that all writes (except within _complete callbacks) are done
> outside the big QEMU mutex, we do not need buffering at all.
>
> Signed-off-by: Paolo Bonzini <pbonz...@redhat.com>
> ---
> include/migration/migration.h | 3 --
> migration.c | 78 ++++++++++------------------------------
> savevm.c | 1 +
> 3 files changed, 21 insertions(+), 61 deletions(-)
>
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index d78bbbb..172ef95 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -34,9 +34,6 @@ struct MigrationState
> int64_t bandwidth_limit;
> size_t bytes_xfer;
> size_t xfer_limit;
> - uint8_t *buffer;
> - size_t buffer_size;
> - size_t buffer_capacity;
> QemuThread thread;
> QEMUBH *cleanup_bh;
>
> diff --git a/migration.c b/migration.c
> index d6a7dff..1f6fbdc 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -503,73 +503,41 @@ int64_t migrate_xbzrle_cache_size(void)
>
> /* migration thread support */
>
> -
> -static void buffered_flush(MigrationState *s)
> -{
> - size_t offset = 0;
> - ssize_t ret = 0;
> -
> - DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
> -
> - if (qemu_file_get_error(s->file)) {
> - s->buffer_size = 0;
> - return;
> - }
> - qemu_fflush(s->file);
> -
> - while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
> - size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit -
> s->bytes_xfer);
> - ret = migrate_fd_put_buffer(s, s->buffer + offset, to_send);
> - if (ret <= 0) {
> - DPRINTF("error flushing data, %zd\n", ret);
> - break;
> - } else {
> - DPRINTF("flushed %zd byte(s)\n", ret);
> - offset += ret;
> - s->bytes_xfer += ret;
> - }
> - }
> -
> - DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
> - memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
> - s->buffer_size -= offset;
> -
> - if (ret < 0) {
> - qemu_file_set_error(s->file, ret);
> - }
> -}
> -
> static int buffered_put_buffer(void *opaque, const uint8_t *buf,
> int64_t pos, int size)
> {
> MigrationState *s = opaque;
> - ssize_t error;
> + ssize_t ret;
> + size_t sent;
>
> DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
>
> - error = qemu_file_get_error(s->file);
> - if (error) {
> + ret = qemu_file_get_error(s->file);
> + if (ret) {
> DPRINTF("flush when error, bailing: %s\n", strerror(-error));
you need to replace error with ret here.
> - return error;
> + return ret;
> }
>
> if (size <= 0) {
> return size;
> }
>
> - if (size > (s->buffer_capacity - s->buffer_size)) {
> - DPRINTF("increasing buffer capacity from %zu by %zu\n",
> - s->buffer_capacity, size + 1024);
> -
> - s->buffer_capacity += size + 1024;
> -
> - s->buffer = g_realloc(s->buffer, s->buffer_capacity);
> + sent = 0;
> + while (size) {
> + ret = migrate_fd_put_buffer(s, buf, size);
> + if (ret <= 0) {
> + DPRINTF("error flushing data, %zd\n", ret);
> + return ret;
> + } else {
> + DPRINTF("flushed %zd byte(s)\n", ret);
> + sent += ret;
> + buf += ret;
> + size -= ret;
> + s->bytes_xfer += ret;
> + }
> }
>
> - memcpy(s->buffer + s->buffer_size, buf, size);
> - s->buffer_size += size;
> -
> - return size;
> + return sent;
> }
>
> static int buffered_close(void *opaque)
> @@ -691,10 +659,9 @@ static void *buffered_file_thread(void *opaque)
> /* usleep expects microseconds */
> g_usleep((initial_time + BUFFER_DELAY - current_time)*1000);
> }
> - buffered_flush(s);
> if (qemu_file_get_error(s->file)) {
> __sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE,
> MIG_STATE_ERROR);
> - } else if (last_round && s->buffer_size == 0) {
> + } else if (last_round) {
> __sync_val_compare_and_swap(&s->state, MIG_STATE_ACTIVE,
> MIG_STATE_COMPLETED);
> }
> }
> @@ -714,7 +681,6 @@ static void *buffered_file_thread(void *opaque)
> qemu_bh_schedule(s->cleanup_bh);
> qemu_mutex_unlock_iothread();
>
> - g_free(s->buffer);
> return NULL;
> }
>
> @@ -731,10 +697,6 @@ void migrate_fd_connect(MigrationState *s)
> {
> s->state = MIG_STATE_ACTIVE;
> s->bytes_xfer = 0;
> - s->buffer = NULL;
> - s->buffer_size = 0;
> - s->buffer_capacity = 0;
> -
> s->xfer_limit = s->bandwidth_limit / XFER_LIMIT_RATIO;
>
> s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
> diff --git a/savevm.c b/savevm.c
> index 7c7774e..ce10295 100644
> --- a/savevm.c
> +++ b/savevm.c
> @@ -1724,6 +1724,7 @@ void qemu_savevm_state_complete(QEMUFile *f)
> }
>
> qemu_put_byte(f, QEMU_VM_EOF);
> + qemu_fflush(f);
> }
>
> uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size)
>
At last :)
Reviewed-by: Orit Wasserman <owass...@redhat.com>