On Fri, Dec 23, 2016 at 05:29:01PM +0300, Vladimir Sementsov-Ogievskiy wrote: > @@ -120,11 +257,42 @@ static bool coroutine_fn yield_and_check(BackupBlockJob > *job) > uint64_t delay_ns = ratelimit_calculate_delay(&job->limit, > job->sectors_read); > job->sectors_read = 0; > + job->delayed = true; > + trace_backup_sleep_delay(block_job_is_cancelled(&job->common), > + block_job_should_pause(&job->common)); > block_job_sleep_ns(&job->common, QEMU_CLOCK_REALTIME, delay_ns); > + job->delayed = false; > } else { > - block_job_sleep_ns(&job->common, QEMU_CLOCK_REALTIME, 0); > + trace_backup_sleep_zero(block_job_is_cancelled(&job->common), > + block_job_should_pause(&job->common)); > + block_job_pause_point(&job->common); > + } > + > + trace_backup_sleep_finish(); > +} > + > +/* backup_busy_sleep > + * Just yield, without setting busy=false
How does this interact with block_job_detach_aio_context() and block_job_drain()? Jobs must reach pause points regularly so that cancellation and detaching AioContexts works. > @@ -132,69 +300,246 @@ static bool coroutine_fn > yield_and_check(BackupBlockJob *job) > return false; > } > > -static int coroutine_fn backup_do_read(BackupBlockJob *job, > - int64_t offset, unsigned int bytes, > - QEMUIOVector *qiov) > +static void backup_job_wait_workers(BackupBlockJob *job) Missing coroutine_fn. This function yields and will crash unless called from coroutine context. Please use coroutine_fn throughout the code so it's clear when a function is only allowed to be called from coroutine context. > +static void backup_worker_pause(BackupBlockJob *job) > +{ > + job->nb_busy_workers--; > + > + trace_backup_worker_pause(qemu_coroutine_self(), job->nb_busy_workers, > + job->waiting_for_workers); > + > + if (job->nb_busy_workers == 0 && job->waiting_for_workers) { > + qemu_coroutine_add_next(job->common.co); > + } > + > + qemu_co_queue_wait(&job->paused_workers); > + > + trace_backup_worker_unpause(qemu_coroutine_self()); > + > + job->nb_busy_workers++; This is similar to rwlock. The main coroutine would use wrlock and the workers would use rdlock. rwlock avoids situations where the main blockjob re-enters a worker and vice versa. qemu_coroutine_add_next() is a lower-level primitive and does not prevent this type of bug. Please use rwlock. > +static inline bool check_delay(BackupBlockJob *job) > +{ > + uint64_t delay_ns; > + > + if (!job->common.speed) { > + return false; > + } > + > + delay_ns = ratelimit_calculate_delay(&job->limit, job->sectors_read); > + job->sectors_read = 0; > + > + if (delay_ns == 0) { > + if (job->delayed) { > + job->delayed = false; > + qemu_co_queue_restart_all(&job->paused_workers); > } > + return false; > + } > > - return ret; > + return job->delayed = true; This looks like a "== vs =" bug. Please reformat it so readers don't have to puzzle out what you meant: job->delayed = true; return true; > +static void coroutine_fn backup_worker_co(void *opaque) > +{ > + BackupBlockJob *job = opaque; > + > + job->running_workers++; > + job->nb_busy_workers++; > + > + while (true) { > + int64_t cluster = backup_get_work(job); > + trace_backup_worker_got_work(job, qemu_coroutine_self(), cluster); > + > + switch (cluster) { > + case BACKUP_WORKER_STOP: > + job->nb_busy_workers--; > + job->running_workers--; > + if (job->nb_busy_workers == 0 && job->waiting_for_workers) { > + qemu_coroutine_add_next(job->common.co); Is there a reason for using qemu_coroutine_add_next() instead of qemu_coroutine_enter()? I think neither function prevents a crash if backup_get_work() returns BACKUP_WORKER_STOP and this coroutine has never yielded yet. We would try to re-enter the main blockjob coroutine. > static int coroutine_fn backup_before_write_notify( > NotifierWithReturn *notifier, > void *opaque) > { > - BackupBlockJob *job = container_of(notifier, BackupBlockJob, > before_write); > - BdrvTrackedRequest *req = opaque; > - int64_t sector_num = req->offset >> BDRV_SECTOR_BITS; > - int nb_sectors = req->bytes >> BDRV_SECTOR_BITS; > + BdrvTrackedRequest *tr = opaque; > + NotifierRequest *nr; > + BackupBlockJob *job = (BackupBlockJob *)tr->bs->job; > + int64_t start = tr->offset / job->cluster_size; > + int64_t end = DIV_ROUND_UP(tr->offset + tr->bytes, job->cluster_size); > + int ret = 0; > + > + assert((tr->offset & (BDRV_SECTOR_SIZE - 1)) == 0); > + assert((tr->bytes & (BDRV_SECTOR_SIZE - 1)) == 0); Are these assertions still necessary? req->bytes >> BDRV_SECTOR_BITS rounds down so we needed a multiple of BDRV_SECTOR_SIZE. Now DIV_ROUND_UP() is used so it doesn't matter. > + > + nr = add_notif_req(job, start, end, qemu_coroutine_self()); > > - assert(req->bs == blk_bs(job->common.blk)); > - assert((req->offset & (BDRV_SECTOR_SIZE - 1)) == 0); > - assert((req->bytes & (BDRV_SECTOR_SIZE - 1)) == 0); > + if (nr == NULL) { > + trace_backup_before_write_notify_skip(job, tr->offset, tr->bytes); > + } else { > + trace_backup_before_write_notify_start(job, tr->offset, tr->bytes, > nr, > + nr->start, nr->end, > nr->nb_wait); > + > + if (!job->has_errors) { > + qemu_co_queue_restart_all(&job->paused_workers); > + } > + co_aio_sleep_ns(blk_get_aio_context(job->common.blk), > + QEMU_CLOCK_REALTIME, WRITE_NOTIFY_TIMEOUT_NS); > + if (nr->nb_wait > 0) { > + /* timer expired and read request not finished */ > + ret = -EINVAL; #define EINVAL 22 /* Invalid argument */ Why did you choose this errno? EIO is the errno that kernel uses when I/O fails (e.g. hardware timeout). > @@ -586,50 +923,62 @@ static void coroutine_fn backup_run(void *opaque) > BackupCompleteData *data; > BlockDriverState *bs = blk_bs(job->common.blk); > int64_t end; > - int ret = 0; > + int i; > + bool is_top = job->sync_mode == MIRROR_SYNC_MODE_TOP; > + bool is_full = job->sync_mode == MIRROR_SYNC_MODE_FULL; > + > + trace_backup_run(); This trace event isn't useful if the guest has multiple drives. Please include arguments that correlate the event with specific objects like the BlockBackend/BlockDriverState/BlockJob instances, I/O request sector and length, etc. > BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError > on_err, > int is_read, int error) > { block_job_get_error_action() was copy-pasted from block_job_error_action(). Now there are two functions with similar names that duplicate code. Why can't you use the existing block_job_error_action() function?
signature.asc
Description: PGP signature