pio should be main entity of all driver, and waiting locked cluster should be made via pio postponeing. So, we rework merge to fit that.
Signed-off-by: Kirill Tkhai <[email protected]> --- drivers/md/dm-ploop-cmd.c | 200 ++++++++++++------------------------------ drivers/md/dm-ploop-map.c | 20 +++- drivers/md/dm-ploop-target.c | 5 + drivers/md/dm-ploop.h | 32 ++++--- 4 files changed, 92 insertions(+), 165 deletions(-) diff --git a/drivers/md/dm-ploop-cmd.c b/drivers/md/dm-ploop-cmd.c index b36bb158a3ac..401380de25db 100644 --- a/drivers/md/dm-ploop-cmd.c +++ b/drivers/md/dm-ploop-cmd.c @@ -3,21 +3,11 @@ #include <linux/uio.h> #include <linux/ctype.h> #include <linux/umh.h> +#include <linux/sched/signal.h> #include "dm-ploop.h" #define DM_MSG_PREFIX "ploop" -static void ploop_queue_deferred_cmd(struct ploop *ploop, struct ploop_cmd *cmd) -{ - unsigned long flags; - - spin_lock_irqsave(&ploop->deferred_lock, flags); - BUG_ON(ploop->deferred_cmd && ploop->deferred_cmd != cmd); - ploop->deferred_cmd = cmd; - spin_unlock_irqrestore(&ploop->deferred_lock, flags); - queue_work(ploop->wq, &ploop->worker); -} - /* * Assign newly allocated memory for BAT array and holes_bitmap * before grow. @@ -557,8 +547,6 @@ static int ploop_resize(struct ploop *ploop, sector_t new_sectors) cmd.resize.hb_nr = hb_nr; cmd.resize.new_sectors = new_sectors; cmd.resize.md0 = md0; - cmd.retval = 0; - cmd.ploop = ploop; ploop_suspend_submitting_pios(ploop); ret = process_resize_cmd(ploop, &cmd); @@ -570,106 +558,75 @@ static int ploop_resize(struct ploop *ploop, sector_t new_sectors) free_md_pages_tree(&cmd.resize.md_pages_root); return ret; } - -static void ploop_queue_deferred_cmd_wrapper(struct ploop *ploop, - int ret, void *data) +static void service_pio_endio(struct pio *pio, void *data, blk_status_t status) { - struct ploop_cmd *cmd = data; - - if (ret) { - /* kwork will see this at next time it is on cpu */ - WRITE_ONCE(cmd->retval, ret); - } - atomic_inc(&cmd->merge.nr_available); - ploop_queue_deferred_cmd(cmd->ploop, cmd); -} - -/* Find mergeable cluster and return it in cmd->merge.cluster */ -static bool iter_delta_clusters(struct ploop *ploop, struct ploop_cmd *cmd) -{ - unsigned int dst_cluster, *cluster = &cmd->merge.cluster; - u8 level; - bool skip; - - BUG_ON(cmd->type != PLOOP_CMD_MERGE_SNAPSHOT); - - for (; *cluster < ploop->nr_bat_entries; ++*cluster) { - /* - * Check *cluster is provided by the merged delta. - * We are in kwork, so bat_rwlock is not needed - * (see comment in process_one_deferred_bio()). - */ - /* FIXME: Optimize this. ploop_bat_entries() is overkill */ - dst_cluster = ploop_bat_entries(ploop, *cluster, &level); - if (dst_cluster == BAT_ENTRY_NONE || - level != ploop->nr_deltas - 2) - continue; - - spin_lock_irq(&ploop->deferred_lock); - skip = find_lk_of_cluster(ploop, *cluster); - spin_unlock_irq(&ploop->deferred_lock); - if (skip) { - /* - * Cluster is locked (maybe, under COW). - * Skip it and try to repeat later. - */ - cmd->merge.do_repeat = true; - continue; - } + struct ploop *ploop = pio->ploop; + blk_status_t *status_ptr = data; + unsigned long flags; - return true; + if (unlikely(status)) { + spin_lock_irqsave(&ploop->err_status_lock, flags); + *status_ptr = status; + spin_unlock_irqrestore(&ploop->err_status_lock, flags); } - return false; + if (atomic_dec_return(&ploop->service_pios) < MERGE_PIOS_MAX / 2) + wake_up(&ploop->service_wq); } -static void process_merge_latest_snapshot_cmd(struct ploop *ploop, - struct ploop_cmd *cmd) +static int process_merge_latest_snapshot(struct ploop *ploop) { - unsigned int dst_cluster, *cluster = &cmd->merge.cluster; - u8 level; - - if (cmd->retval) - goto out; - - while (iter_delta_clusters(ploop, cmd)) { - /* - * We are in kwork, so bat_rwlock is not needed - * (we can't race with changing BAT, since cmds - * are processed before bios and piwb is sync). - */ - /* FIXME: Optimize this: ploop_bat_entries() is overkill */ - dst_cluster = ploop_bat_entries(ploop, *cluster, &level); + static blk_status_t service_status; + struct bio_vec bvec = {0}; + struct pio *pio; + int ret = 0; + u32 clu; - /* Check we can submit one more cow in parallel */ - if (!atomic_add_unless(&cmd->merge.nr_available, -1, 0)) - return; - /* - * This adds cluster lk. Further write bios to *cluster will go - * from ploop_map to kwork (because bat_levels[*cluster] is not - * top_level()), so they will see the lk. - */ - if (submit_cluster_cow(ploop, level, *cluster, dst_cluster, - ploop_queue_deferred_cmd_wrapper, cmd)) { - atomic_inc(&cmd->merge.nr_available); - cmd->retval = -ENOMEM; - goto out; + for (clu = 0; clu < ploop->nr_bat_entries; clu++) { + if (fatal_signal_pending(current)) { + ret = -EINTR; + break; + } + pio = kmalloc(sizeof(*pio), GFP_KERNEL); + if (!pio) { + ret = -ENOMEM; + break; + } + init_pio(ploop, REQ_OP_WRITE, pio); + pio->free_on_endio = true; + pio->bi_io_vec = &bvec; + pio->bi_iter.bi_sector = CLU_TO_SEC(ploop, clu); + pio->bi_iter.bi_size = 0; + pio->bi_iter.bi_idx = 0; + pio->bi_iter.bi_bvec_done = 0; + pio->endio_cb = service_pio_endio; + pio->endio_cb_data = &service_status; + pio->is_fake_merge = true; + WARN_ON_ONCE(!fake_merge_pio(pio)); + + defer_pios(ploop, pio, NULL); + + if (atomic_inc_return(&ploop->service_pios) == MERGE_PIOS_MAX) { + wait_event(ploop->service_wq, + atomic_read(&ploop->service_pios) < MERGE_PIOS_MAX); } - ++*cluster; + if (unlikely(READ_ONCE(service_status))) + break; } -out: - if (atomic_read(&cmd->merge.nr_available) != NR_MERGE_BIOS) { - /* Wait till last COW queues us */ - return; + + wait_event(ploop->service_wq, !atomic_read(&ploop->service_pios)); + if (!ret) { + spin_lock_irq(&ploop->err_status_lock); + ret = blk_status_to_errno(service_status); + spin_unlock_irq(&ploop->err_status_lock); } - complete(&cmd->comp); /* Last touch of cmd memory */ + return ret; } static int ploop_merge_latest_snapshot(struct ploop *ploop) { - struct ploop_cmd cmd; struct file *file; u8 level; int ret; @@ -680,33 +637,14 @@ static int ploop_merge_latest_snapshot(struct ploop *ploop) return -EROFS; if (ploop->nr_deltas < 2) return -ENOENT; -again: - memset(&cmd, 0, sizeof(cmd)); - cmd.type = PLOOP_CMD_MERGE_SNAPSHOT; - cmd.ploop = ploop; - atomic_set(&cmd.merge.nr_available, NR_MERGE_BIOS); - - init_completion(&cmd.comp); - ploop_queue_deferred_cmd(ploop, &cmd); - ret = wait_for_completion_interruptible(&cmd.comp); - if (ret) { - /* - * process_merge_latest_snapshot_cmd() will see this - * later or earlier. Take a lock if you want earlier. - */ - WRITE_ONCE(cmd.retval, -EINTR); - wait_for_completion(&cmd.comp); - } - if (cmd.retval) + ret = process_merge_latest_snapshot(ploop); + if (ret) goto out; - if (cmd.merge.do_repeat) - goto again; - /* Delta merged. Release delta's file */ - cmd.retval = ploop_suspend_submitting_pios(ploop); - if (cmd.retval) + ret = ploop_suspend_submitting_pios(ploop); + if (ret) goto out; write_lock_irq(&ploop->bat_rwlock); @@ -719,7 +657,7 @@ static int ploop_merge_latest_snapshot(struct ploop *ploop) ploop_resume_submitting_pios(ploop); out: - return cmd.retval; + return ret; } static void notify_delta_merged(struct ploop *ploop, u8 level, @@ -1154,28 +1092,6 @@ static int ploop_flip_upper_deltas(struct ploop *ploop) return process_flip_upper_deltas(ploop); } -/* Handle user commands requested via "message" interface */ -void process_deferred_cmd(struct ploop *ploop) - __releases(&ploop->deferred_lock) - __acquires(&ploop->deferred_lock) -{ - struct ploop_cmd *cmd = ploop->deferred_cmd; - - if (likely(!cmd)) - return; - - ploop->deferred_cmd = NULL; - spin_unlock_irq(&ploop->deferred_lock); - - if (cmd->type == PLOOP_CMD_MERGE_SNAPSHOT) { - process_merge_latest_snapshot_cmd(ploop, cmd); - } else { - cmd->retval = -EINVAL; - complete(&cmd->comp); - } - spin_lock_irq(&ploop->deferred_lock); -} - static int ploop_get_event(struct ploop *ploop, char *result, unsigned int maxlen) { unsigned int sz = 0; diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c index bcdc63a1d5c9..dc2268670f70 100644 --- a/drivers/md/dm-ploop-map.c +++ b/drivers/md/dm-ploop-map.c @@ -70,6 +70,7 @@ void init_pio(struct ploop *ploop, unsigned int bi_op, struct pio *pio) pio->bi_op = bi_op; pio->wants_discard_index_cleanup = false; pio->is_data_alloc = false; + pio->is_fake_merge = false; pio->free_on_endio = false; pio->ref_index = PLOOP_REF_INDEX_INVALID; pio->bi_status = BLK_STS_OK; @@ -478,6 +479,14 @@ static bool pio_endio_if_all_zeros(struct pio *pio) return true; } +static bool pio_endio_if_merge_fake_pio(struct pio *pio) +{ + if (likely(!fake_merge_pio(pio))) + return false; + pio_endio(pio); + return true; +} + static int punch_hole(struct file *file, loff_t pos, loff_t len) { return vfs_fallocate(file, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, @@ -1136,9 +1145,9 @@ static bool postpone_if_cluster_locked(struct ploop *ploop, struct pio *pio, return e_h != NULL; } -int submit_cluster_cow(struct ploop *ploop, unsigned int level, - unsigned int cluster, unsigned int dst_cluster, - void (*end_fn)(struct ploop *, int, void *), void *data) +static int submit_cluster_cow(struct ploop *ploop, unsigned int level, + unsigned int cluster, unsigned int dst_cluster, + void (*end_fn)(struct ploop *, int, void *), void *data) { struct ploop_cow *cow = NULL; struct pio *pio = NULL; @@ -1392,6 +1401,8 @@ static int process_one_deferred_bio(struct ploop *ploop, struct pio *pio, if (cluster_is_in_top_delta(ploop, cluster)) { /* Already mapped */ + if (pio_endio_if_merge_fake_pio(pio)) + goto out; goto queue; } else if (!op_is_write(pio->bi_op)) { /* @@ -1538,13 +1549,10 @@ void do_ploop_work(struct work_struct *ws) * * Currenly, it's impossible to submit two bat pages update * in parallel, since the update uses global ploop->bat_page. - * Note, that process_deferred_cmd() expects there is no - * pending index wb. */ ploop_index_wb_init(&piwb, ploop); spin_lock_irq(&ploop->deferred_lock); - process_deferred_cmd(ploop); process_delta_wb(ploop, &piwb); list_splice_init(&ploop->deferred_pios, &deferred_pios); diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c index 76f66fe11de1..3e05895d1cfe 100644 --- a/drivers/md/dm-ploop-target.c +++ b/drivers/md/dm-ploop-target.c @@ -123,10 +123,9 @@ void free_md_pages_tree(struct rb_root *root) static bool ploop_has_pending_activity(struct ploop *ploop) { - bool has; + bool has = false; spin_lock_irq(&ploop->deferred_lock); - has = ploop->deferred_cmd; has |= !list_empty(&ploop->deferred_pios); has |= !list_empty(&ploop->discard_pios); has |= !list_empty(&ploop->delta_cow_action_list); @@ -312,7 +311,9 @@ static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv) } rwlock_init(&ploop->bat_rwlock); + spin_lock_init(&ploop->err_status_lock); init_rwsem(&ploop->ctl_rwsem); + init_waitqueue_head(&ploop->service_wq); spin_lock_init(&ploop->inflight_lock); spin_lock_init(&ploop->deferred_lock); diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h index 2a474e5d3cb6..a2d6866d99a5 100644 --- a/drivers/md/dm-ploop.h +++ b/drivers/md/dm-ploop.h @@ -40,12 +40,10 @@ struct ploop_delta { bool is_raw; }; +#define MERGE_PIOS_MAX 64 + struct ploop_cmd { -#define PLOOP_CMD_MERGE_SNAPSHOT 3 struct completion comp; - struct ploop *ploop; - unsigned int type; - int retval; union { struct { sector_t new_sectors; @@ -62,12 +60,6 @@ struct ploop_cmd { unsigned int cluster, dst_cluster; struct pio *pio; } resize; - struct { -#define NR_MERGE_BIOS 64 - atomic_t nr_available; - unsigned int cluster; /* Currently iterated cluster */ - bool do_repeat; - } merge; }; }; @@ -173,8 +165,11 @@ struct ploop { struct list_head resubmit_pios; /* After partial IO */ struct list_head enospc_pios; /* Delayed after ENOSPC */ + atomic_t service_pios; + struct wait_queue_head service_wq; + + spinlock_t err_status_lock; struct rw_semaphore ctl_rwsem; - struct ploop_cmd *deferred_cmd; /* * List of locked clusters (no write is possible). @@ -230,6 +225,7 @@ struct pio { bool is_data_alloc:1; bool wants_discard_index_cleanup:1; + bool is_fake_merge:1; bool free_on_endio:1; /* * 0 and 1 are related to inflight_bios_ref[], @@ -486,6 +482,16 @@ static inline struct hlist_head *ploop_htable_slot(struct hlist_head head[], u32 return &head[hash_32(clu, PLOOP_HASH_TABLE_BITS)]; } +static inline bool fake_merge_pio(struct pio *pio) +{ + if (pio->is_fake_merge) { + WARN_ON_ONCE(pio->bi_iter.bi_size || + pio->bi_op != REQ_OP_WRITE); + return true; + } + return false; +} + extern void md_page_insert(struct ploop *ploop, struct md_page *md); extern void ploop_free_md_page(struct md_page *md); extern void free_md_pages_tree(struct rb_root *root); @@ -499,7 +505,6 @@ extern void defer_pios(struct ploop *ploop, struct pio *pio, struct list_head *p extern void do_ploop_work(struct work_struct *ws); extern void do_ploop_fsync_work(struct work_struct *ws); extern void ploop_event_work(struct work_struct *work); -extern void process_deferred_cmd(struct ploop *ploop); extern int ploop_clone_and_map(struct dm_target *ti, struct request *rq, union map_info *map_context, struct request **clone); extern struct pio *find_lk_of_cluster(struct ploop *ploop, u32 cluster); @@ -514,9 +519,6 @@ extern void ploop_reset_bat_update(struct ploop_index_wb *); extern void ploop_submit_index_wb_sync(struct ploop *, struct ploop_index_wb *); extern int ploop_message(struct dm_target *ti, unsigned int argc, char **argv, char *result, unsigned int maxlen); -extern int submit_cluster_cow(struct ploop *ploop, unsigned int level, - unsigned int cluster, unsigned int dst_cluster, - void (*end_fn)(struct ploop *, int, void *), void *data); extern struct pio * alloc_pio_with_pages(struct ploop *ploop); extern void free_pio_with_pages(struct ploop *ploop, struct pio *pio); _______________________________________________ Devel mailing list [email protected] https://lists.openvz.org/mailman/listinfo/devel
