Create threads to execute pios in parallel - call them pio runners. Use number of CPUs to determine the number of threads started. >From worker each pio is sent to a thread in round-robin fashion thru work_llist. Maintain the number of pios sent so we can wait for them to be processed - NB we only want to keep the order of execution of different pio types which can be different than the order of their completion. We send a batch of pios to the runners and if necessary we wait for them to be processed before moving forwards - we need this for metadata writeback and flushes.
https://virtuozzo.atlassian.net/browse/VSTOR-91821 Signed-off-by: Alexander Atanasov <alexander.atana...@virtuozzo.com> --- drivers/md/dm-ploop-map.c | 133 ++++++++++++++++++++++++++++++++--- drivers/md/dm-ploop-target.c | 41 +++++++++-- drivers/md/dm-ploop.h | 14 +++- 3 files changed, 170 insertions(+), 18 deletions(-) diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c index b3e8b934ff13..2579a4dbc834 100644 --- a/drivers/md/dm-ploop-map.c +++ b/drivers/md/dm-ploop-map.c @@ -20,6 +20,8 @@ #include "dm-ploop.h" #include "dm-rq.h" +static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio); + #define PREALLOC_SIZE (128ULL * 1024 * 1024) static void ploop_handle_cleanup(struct ploop *ploop, struct pio *pio); @@ -1955,6 +1957,35 @@ static void process_ploop_fsync_work(struct ploop *ploop, struct llist_node *llf } } +static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio) +{ + int i; + struct ploop_worker *wrkr; + + if (++ploop->last_used_runner >= ploop->nkt_runners) + ploop->last_used_runner = 0; + wrkr = ploop->kt_runners[ploop->last_used_runner]; + + atomic_inc(&ploop->kt_worker->inflight_pios); + llist_add((struct llist_node *)(&pio->list), &wrkr->work_llist); + wake_up_process(wrkr->task); + + return 0; +} + +static inline int ploop_runners_add_work_list(struct ploop *ploop, struct llist_node *list) +{ + struct llist_node *pos, *t; + struct pio *pio; + + llist_for_each_safe(pos, t, list) { + pio = list_entry((struct list_head *)pos, typeof(*pio), list); + ploop_runners_add_work(ploop, pio); + } + + return 0; +} + void do_ploop_run_work(struct ploop *ploop) { LLIST_HEAD(deferred_pios); @@ -2016,18 +2047,86 @@ void do_ploop_work(struct work_struct *ws) do_ploop_run_work(ploop); } -int ploop_worker(void *data) +int ploop_pio_runner(void *data) { struct ploop_worker *worker = data; struct ploop *ploop = worker->ploop; + struct llist_node *llwork; + struct pio *pio; + struct llist_node *pos, *t; + unsigned int old_flags = current->flags; + int did_process_pios = 0; for (;;) { + current->flags = old_flags; + clear_bit(PLOOP_WRK_BUSY, &worker->status); set_current_state(TASK_INTERRUPTIBLE); - if (kthread_should_stop()) { - __set_current_state(TASK_RUNNING); - break; +check_for_more: + llwork = llist_del_all(&worker->work_llist); + if (!llwork) { + if (did_process_pios) { + did_process_pios = 0; + wake_up_interruptible(&ploop->dispatcher_wq_data); + } + /* Only stop when there is no more pios */ + if (kthread_should_stop()) { + __set_current_state(TASK_RUNNING); + break; + } + schedule(); + continue; + } + set_bit(PLOOP_WRK_BUSY, &worker->status); + __set_current_state(TASK_RUNNING); + old_flags = current->flags; + current->flags |= PF_IO_THREAD|PF_LOCAL_THROTTLE|PF_MEMALLOC_NOIO; + + llist_for_each_safe(pos, t, llwork) { + pio = list_entry((struct list_head *)pos, typeof(*pio), list); + INIT_LIST_HEAD(&pio->list); + switch (pio->queue_list_id) { + case PLOOP_LIST_FLUSH: + WARN_ON_ONCE(1); /* We must not see flushes here */ + break; + case PLOOP_LIST_PREPARE: + // fsync pios can come here for endio + // XXX: make it a FSYNC list + ploop_pio_endio(pio); + break; + case PLOOP_LIST_DEFERRED: + ploop_process_one_deferred_bio(ploop, pio); + break; + case PLOOP_LIST_COW: + ploop_process_one_delta_cow(ploop, pio); + break; + case PLOOP_LIST_DISCARD: + ploop_process_one_discard_pio(ploop, pio); + break; + // XXX: make it list MDWB + case PLOOP_LIST_INVALID: /* resubmit sets the list id to invalid */ + ploop_submit_rw_mapped(ploop, pio); + break; + default: + WARN_ON_ONCE(1); + } + atomic_dec(&ploop->kt_worker->inflight_pios); } + cond_resched(); + did_process_pios = 1; + goto check_for_more; + } + return 0; +} + +int ploop_worker(void *data) +{ + struct ploop_worker *worker = data; + struct ploop *ploop = worker->ploop; + + for (;;) { + set_current_state(TASK_INTERRUPTIBLE); + #ifdef USE_KTHREAD smp_rmb(); /* */ if (llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) && @@ -2035,16 +2134,32 @@ int ploop_worker(void *data) llist_empty(&ploop->pios[PLOOP_LIST_DEFERRED]) && llist_empty(&ploop->pios[PLOOP_LIST_DISCARD]) && llist_empty(&ploop->pios[PLOOP_LIST_COW]) && - llist_empty(&ploop->llresubmit_pios) - ) - schedule(); + llist_empty(&ploop->llresubmit_pios) && + !ploop->force_md_writeback + ) { + + if (kthread_should_stop()) { + wait_event_interruptible(ploop->dispatcher_wq_data, + (!ploop_runners_have_pending(ploop))); + __set_current_state(TASK_RUNNING); + break; + } + schedule(); + /* now check for pending work */ + } __set_current_state(TASK_RUNNING); do_ploop_run_work(ploop); - cond_resched(); + cond_resched(); /* give other processes chance to run */ #else - schedule(); // just do nothing yet + schedule(); /* just do nothing yet */ #endif + if (kthread_should_stop()) { + wait_event_interruptible(ploop->dispatcher_wq_data, + (!ploop_runners_have_pending(ploop))); + __set_current_state(TASK_RUNNING); + break; + } } return 0; } diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c index 772a5fd25c47..95ed3e26f2b8 100644 --- a/drivers/md/dm-ploop-target.c +++ b/drivers/md/dm-ploop-target.c @@ -170,8 +170,8 @@ static void ploop_destroy(struct ploop *ploop) } if (ploop->kt_worker) { + ploop->force_md_writeback = 1; wake_up_process(ploop->kt_worker->task); - // FIXME: a better way to wait for lists to drain /* try to send all pending - if we have partial io and enospc end bellow */ while (!llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) || !llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]) || @@ -183,12 +183,25 @@ static void ploop_destroy(struct ploop *ploop) smp_rmb(); /* */ } + if (ploop->kt_runners) { + for (i = 0; i < ploop->nkt_runners; i++) { + if (ploop->kt_runners[i]) { + wake_up_process(ploop->kt_runners[i]->task); + kthread_stop(ploop->kt_runners[i]->task); + kfree(ploop->kt_runners[i]); + } + } + } + kthread_stop(ploop->kt_worker->task); /* waits for the thread to stop */ + WARN_ON(!llist_empty(&ploop->pios[PLOOP_LIST_PREPARE])); WARN_ON(!llist_empty(&ploop->llresubmit_pios)); - // TODO: check if any pios left and end them with error + WARN_ON(!llist_empty(&ploop->enospc_pios)); + kfree(ploop->kt_runners); kfree(ploop->kt_worker); } + WARN_ON_ONCE(ploop_has_pending_activity(ploop)); for (i = 0; i < 2; i++) percpu_ref_exit(&ploop->inflight_bios_ref[i]); @@ -355,7 +368,8 @@ ALLOW_ERROR_INJECTION(ploop_add_deltas_stack, ERRNO); argv++; \ } while (0); -static struct ploop_worker *ploop_worker_create(struct ploop *ploop) +static struct ploop_worker *ploop_worker_create(struct ploop *ploop, + int (*worker_fn)(void *), const char *pref, int id) { struct ploop_worker *worker; struct task_struct *task; @@ -365,12 +379,13 @@ static struct ploop_worker *ploop_worker_create(struct ploop *ploop) return NULL; worker->ploop = ploop; - task = kthread_create(ploop_worker, worker, "ploop-%d-0", - current->pid); + task = kthread_create(worker_fn, worker, "ploop-%d-%s-%d", + current->pid, pref, id); if (IS_ERR(task)) goto out_err; worker->task = task; + init_llist_head(&worker->work_llist); wake_up_process(task); // TODO: handle cgroups @@ -543,10 +558,24 @@ static int ploop_ctr(struct dm_target *ti, unsigned int argc, char **argv) goto err; - ploop->kt_worker = ploop_worker_create(ploop); + init_waitqueue_head(&ploop->dispatcher_wq_data); + + ploop->kt_worker = ploop_worker_create(ploop, ploop_worker, "d", 0); if (!ploop->kt_worker) goto err; +/* make it a param = either module or cpu based or dev req queue */ +#define PLOOP_PIO_RUNNERS nr_cpu_ids + ploop->kt_runners = kcalloc(PLOOP_PIO_RUNNERS, sizeof(struct kt_worker *), GFP_KERNEL); + if (!ploop->kt_runners) + goto err; + + ploop->nkt_runners = PLOOP_PIO_RUNNERS; + for (i=0; i < ploop->nkt_runners; i++) { + ploop->kt_runners[i] = ploop_worker_create(ploop, ploop_pio_runner, "r", i+1); + if (!ploop->kt_runners[i]) + goto err; + } ret = ploop_add_deltas_stack(ploop, &argv[0], argc); if (ret) goto err; diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h index 02e47033f97c..f4f1449c82f3 100644 --- a/drivers/md/dm-ploop.h +++ b/drivers/md/dm-ploop.h @@ -148,14 +148,18 @@ enum { struct ploop_worker { struct ploop *ploop; struct task_struct *task; - u64 kcov_handle; + struct llist_head work_llist; +#define PLOOP_WRK_BUSY (1U<<0) + unsigned long status; + atomic_t inflight_pios; }; struct ploop { + struct wait_queue_head dispatcher_wq_data; struct dm_target *ti; #define PLOOP_PRQ_POOL_SIZE 512 /* Twice nr_requests from blk_mq_init_sched() */ mempool_t *prq_pool; -#define PLOOP_PIO_POOL_SIZE 256 +#define PLOOP_PIO_POOL_SIZE 512 mempool_t *pio_pool; struct rb_root bat_entries; @@ -201,7 +205,10 @@ struct ploop { struct work_struct worker; struct work_struct event_work; - struct ploop_worker *kt_worker; + struct ploop_worker *kt_worker; /* dispatcher thread */ + struct ploop_worker **kt_runners; /* pio runners */ + unsigned int nkt_runners; + unsigned int last_used_runner; struct completion inflight_bios_ref_comp; struct percpu_ref inflight_bios_ref[2]; bool inflight_ref_comp_pending; @@ -611,6 +618,7 @@ extern void ploop_enospc_timer(struct timer_list *timer); extern loff_t ploop_llseek_hole(struct dm_target *ti, loff_t offset, int whence); extern int ploop_worker(void *data); +extern int ploop_pio_runner(void *data); extern void ploop_disable_writeback_delay(struct ploop *ploop); extern void ploop_enable_writeback_delay(struct ploop *ploop); -- 2.43.0 _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel