On Thu, Jul 28, 2011 at 4:53 PM, Marcelo Tosatti <mtosa...@redhat.com> wrote: > On Wed, Jul 27, 2011 at 02:44:48PM +0100, Stefan Hajnoczi wrote: >> For leaf images with copy-on-read semantics, the stream command allows >> the user to populate the image file by copying data from the backing >> file while the guest is running. Once all blocks have been streamed, >> the dependency on the original backing file is removed. Therefore, >> stream commands can be used to implement post-copy live block migration >> and rapid deployment. >> >> The command synopsis is: >> >> block_stream >> ------------ >> >> Copy data from a backing file into a block device. >> >> The block streaming operation is performed in the background until the >> entire backing file has been copied. This command returns immediately >> once streaming has started. The status of ongoing block streaming >> operations can be checked with query-block-jobs. The operation can be >> stopped before it has completed using the block_job_cancel command. >> >> If a base file is specified then sectors are not copied from that base >> file and its backing chain. When streaming completes the image file >> will have the base file as its backing file. This can be used to stream >> a subset of the backing file chain instead of flattening the entire >> image. >> >> On successful completion the image file is updated to drop the backing >> file. >> >> Arguments: >> >> - device: device name (json-string) >> - base: common backing file (json-string, optional) >> >> Errors: >> >> DeviceInUse: streaming is already active on this device >> DeviceNotFound: device name is invalid >> NotSupported: image streaming is not supported by this device >> >> Events: >> >> On completion the BLOCK_JOB_COMPLETED event is raised with the following >> fields: >> >> - type: job type ("stream" for image streaming, json-string) >> - device: device name (json-string) >> - end: maximum progress value (json-int) >> - position: current progress value (json-int) >> - speed: rate limit, bytes per second (json-int) >> - error: error message (json-string, only on error) >> >> The completion event is raised both on success and on failure. On >> success position is equal to end. On failure position and end can be >> used to indicate at which point the operation failed. >> >> On failure the error field contains a human-readable error message. >> There are no semantics other than that streaming has failed and clients >> should not try to interpret the error string. >> >> Examples: >> >> -> { "execute": "block_stream", "arguments": { "device": "virtio0" } } >> <- { "return": {} } >> >> Signed-off-by: Adam Litke <a...@us.ibm.com> >> Signed-off-by: Stefan Hajnoczi <stefa...@linux.vnet.ibm.com> >> --- >> blockdev.c | 133 >> +++++++++++++++++++++++++++++++++++++++++++++++++++++++ >> blockdev.h | 1 + >> hmp-commands.hx | 14 ++++++ >> monitor.c | 3 + >> monitor.h | 1 + >> qerror.c | 8 +++ >> qerror.h | 6 +++ >> qmp-commands.hx | 64 ++++++++++++++++++++++++++ >> 8 files changed, 230 insertions(+), 0 deletions(-) >> >> diff --git a/blockdev.c b/blockdev.c >> index b337732..cd5e49c 100644 >> --- a/blockdev.c >> +++ b/blockdev.c >> @@ -16,6 +16,7 @@ >> #include "sysemu.h" >> #include "hw/qdev.h" >> #include "block_int.h" >> +#include "qjson.h" >> >> static QTAILQ_HEAD(drivelist, DriveInfo) drives = >> QTAILQ_HEAD_INITIALIZER(drives); >> >> @@ -50,6 +51,131 @@ static const int if_max_devs[IF_COUNT] = { >> [IF_SCSI] = 7, >> }; >> >> +typedef struct StreamState { >> + int64_t offset; /* current position in block device */ >> + BlockDriverState *bs; >> + QEMUTimer *timer; >> + QLIST_ENTRY(StreamState) list; >> +} StreamState; >> + >> +static QLIST_HEAD(, StreamState) block_streams = >> + QLIST_HEAD_INITIALIZER(block_streams); >> + >> +static QObject *stream_get_qobject(StreamState *s) >> +{ >> + const char *name = bdrv_get_device_name(s->bs); >> + int64_t len = bdrv_getlength(s->bs); >> + >> + return qobject_from_jsonf("{ 'device': %s, 'type': 'stream', " >> + "'offset': %" PRId64 ", 'len': %" PRId64 ", " >> + "'speed': %" PRId64 " }", >> + name, s->offset, len, (int64_t)0); >> +} >> + >> +static void stream_mon_event(StreamState *s, int ret) >> +{ >> + QObject *data = stream_get_qobject(s); >> + >> + if (ret < 0) { >> + QDict *qdict = qobject_to_qdict(data); >> + >> + qdict_put(qdict, "error", qstring_from_str(strerror(-ret))); >> + } >> + >> + monitor_protocol_event(QEVENT_BLOCK_JOB_COMPLETED, data); >> + qobject_decref(data); >> +} >> + >> +static void stream_free(StreamState *s) >> +{ >> + QLIST_REMOVE(s, list); >> + >> + qemu_del_timer(s->timer); >> + qemu_free_timer(s->timer); >> + qemu_free(s); >> +} >> + >> +static void stream_complete(StreamState *s, int ret) >> +{ >> + stream_mon_event(s, ret); >> + stream_free(s); >> +} >> + >> +static void stream_cb(void *opaque, int nb_sectors) >> +{ >> + StreamState *s = opaque; >> + >> + if (nb_sectors < 0) { >> + stream_complete(s, nb_sectors); >> + return; >> + } >> + >> + s->offset += nb_sectors * BDRV_SECTOR_SIZE; >> + >> + if (s->offset == bdrv_getlength(s->bs)) { >> + bdrv_change_backing_file(s->bs, NULL, NULL); >> + stream_complete(s, 0); >> + } else { >> + qemu_mod_timer(s->timer, qemu_get_clock_ns(rt_clock)); >> + } >> +} >> + >> +/* We can't call bdrv_aio_stream() directly from the callback because that >> + * makes qemu_aio_flush() not complete until the streaming is completed. >> + * By delaying with a timer, we give qemu_aio_flush() a chance to complete. >> + */ >> +static void stream_next_iteration(void *opaque) >> +{ >> + StreamState *s = opaque; >> + >> + bdrv_aio_copy_backing(s->bs, s->offset / BDRV_SECTOR_SIZE, stream_cb, >> s); >> +} > > The plan is to replace format specific code with the generic > implementation in the future?
I think Kevin has said this will not be merged into qemu.git. But I am sharing it as a reference implementation against which the libvirt API works. Next I will send out a stub implementation of the QMP/HMP block_stream APIs without any of the block driver-specific functionality. It may make sense to merge this into QEMU just to nail down the QMP/HMP interface, even if it does not do anything yet (we have errors for ENOTSUP). >> + >> +static StreamState *stream_find(const char *device) >> +{ >> + StreamState *s; >> + >> + QLIST_FOREACH(s, &block_streams, list) { >> + if (strcmp(bdrv_get_device_name(s->bs), device) == 0) { >> + return s; >> + } >> + } >> + return NULL; >> +} >> + >> +static StreamState *stream_start(const char *device) >> +{ >> + StreamState *s; >> + BlockDriverAIOCB *acb; >> + BlockDriverState *bs; >> + >> + s = stream_find(device); >> + if (s) { >> + qerror_report(QERR_DEVICE_IN_USE, device); >> + return NULL; >> + } >> + >> + bs = bdrv_find(device); >> + if (!bs) { >> + qerror_report(QERR_DEVICE_NOT_FOUND, device); >> + return NULL; >> + } >> + >> + s = qemu_mallocz(sizeof(*s)); >> + s->bs = bs; >> + s->timer = qemu_new_timer_ns(rt_clock, stream_next_iteration, s); >> + QLIST_INSERT_HEAD(&block_streams, s, list); > > Should increase refcount with drive_get_ref(). Yes thanks. Stefan