Create threads to execute pios in parallel - call them pio runners.
Use number of CPUs to determine the number of threads started.
>From worker each pio is sent to a thread in round-robin fashion
thru work_llist. Maintain the number of pios sent so we can wait
for them to be processed - NB we only want to keep the order of
execution of different pio types  which can be different than
the order of their completion. We send a batch of pios to the runners
and if necessary we wait for them to be processed before moving
forwards - we need this for metadata writeback and flushes.

https://virtuozzo.atlassian.net/browse/VSTOR-91821
Signed-off-by: Alexander Atanasov <alexander.atana...@virtuozzo.com>
---
 drivers/md/dm-ploop-map.c    | 133 ++++++++++++++++++++++++++++++++---
 drivers/md/dm-ploop-target.c |  41 +++++++++--
 drivers/md/dm-ploop.h        |  14 +++-
 3 files changed, 170 insertions(+), 18 deletions(-)

diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index b3e8b934ff13..2579a4dbc834 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -20,6 +20,8 @@
 #include "dm-ploop.h"
 #include "dm-rq.h"
 
+static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio);
+
 #define PREALLOC_SIZE (128ULL * 1024 * 1024)
 
 static void ploop_handle_cleanup(struct ploop *ploop, struct pio *pio);
@@ -1955,6 +1957,35 @@ static void process_ploop_fsync_work(struct ploop 
*ploop, struct llist_node *llf
        }
 }
 
+static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio)
+{
+       int i;
+       struct ploop_worker *wrkr;
+
+       if (++ploop->last_used_runner >= ploop->nkt_runners)
+               ploop->last_used_runner = 0;
+       wrkr = ploop->kt_runners[ploop->last_used_runner];
+
+       atomic_inc(&ploop->kt_worker->inflight_pios);
+       llist_add((struct llist_node *)(&pio->list), &wrkr->work_llist);
+       wake_up_process(wrkr->task);
+
+       return 0;
+}
+
+static inline int ploop_runners_add_work_list(struct ploop *ploop, struct 
llist_node *list)
+{
+       struct llist_node *pos, *t;
+       struct pio *pio;
+
+       llist_for_each_safe(pos, t, list) {
+               pio = list_entry((struct list_head *)pos, typeof(*pio), list);
+               ploop_runners_add_work(ploop, pio);
+       }
+
+       return 0;
+}
+
 void do_ploop_run_work(struct ploop *ploop)
 {
        LLIST_HEAD(deferred_pios);
@@ -2016,18 +2047,86 @@ void do_ploop_work(struct work_struct *ws)
        do_ploop_run_work(ploop);
 }
 
-int ploop_worker(void *data)
+int ploop_pio_runner(void *data)
 {
        struct ploop_worker *worker = data;
        struct ploop *ploop = worker->ploop;
+       struct llist_node *llwork;
+       struct pio *pio;
+       struct llist_node *pos, *t;
+       unsigned int old_flags = current->flags;
+       int did_process_pios = 0;
 
        for (;;) {
+               current->flags = old_flags;
+               clear_bit(PLOOP_WRK_BUSY, &worker->status);
                set_current_state(TASK_INTERRUPTIBLE);
 
-               if (kthread_should_stop()) {
-                       __set_current_state(TASK_RUNNING);
-                       break;
+check_for_more:
+               llwork = llist_del_all(&worker->work_llist);
+               if (!llwork) {
+                       if (did_process_pios) {
+                               did_process_pios = 0;
+                               
wake_up_interruptible(&ploop->dispatcher_wq_data);
+                       }
+                       /* Only stop when there is no more pios */
+                       if (kthread_should_stop()) {
+                               __set_current_state(TASK_RUNNING);
+                               break;
+                       }
+                       schedule();
+                       continue;
+               }
+               set_bit(PLOOP_WRK_BUSY, &worker->status);
+               __set_current_state(TASK_RUNNING);
+               old_flags = current->flags;
+               current->flags |= 
PF_IO_THREAD|PF_LOCAL_THROTTLE|PF_MEMALLOC_NOIO;
+
+               llist_for_each_safe(pos, t, llwork) {
+                       pio = list_entry((struct list_head *)pos, typeof(*pio), 
list);
+                       INIT_LIST_HEAD(&pio->list);
+                       switch (pio->queue_list_id) {
+                       case PLOOP_LIST_FLUSH:
+                               WARN_ON_ONCE(1);        /* We must not see 
flushes here */
+                               break;
+                       case PLOOP_LIST_PREPARE:
+                               // fsync pios can come here for endio
+                               // XXX: make it a FSYNC list
+                               ploop_pio_endio(pio);
+                               break;
+                       case PLOOP_LIST_DEFERRED:
+                               ploop_process_one_deferred_bio(ploop, pio);
+                               break;
+                       case PLOOP_LIST_COW:
+                               ploop_process_one_delta_cow(ploop, pio);
+                               break;
+                       case PLOOP_LIST_DISCARD:
+                               ploop_process_one_discard_pio(ploop, pio);
+                               break;
+                               // XXX: make it list MDWB
+                       case PLOOP_LIST_INVALID: /* resubmit sets the list id 
to invalid */
+                               ploop_submit_rw_mapped(ploop, pio);
+                               break;
+                       default:
+                               WARN_ON_ONCE(1);
+                       }
+                       atomic_dec(&ploop->kt_worker->inflight_pios);
                }
+               cond_resched();
+               did_process_pios = 1;
+               goto check_for_more;
+       }
+       return 0;
+}
+
+int ploop_worker(void *data)
+{
+       struct ploop_worker *worker = data;
+       struct ploop *ploop = worker->ploop;
+
+       for (;;) {
+               set_current_state(TASK_INTERRUPTIBLE);
+
 #ifdef USE_KTHREAD
                smp_rmb(); /* */
                if (llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) &&
@@ -2035,16 +2134,32 @@ int ploop_worker(void *data)
                        llist_empty(&ploop->pios[PLOOP_LIST_DEFERRED]) &&
                        llist_empty(&ploop->pios[PLOOP_LIST_DISCARD]) &&
                        llist_empty(&ploop->pios[PLOOP_LIST_COW]) &&
-                       llist_empty(&ploop->llresubmit_pios)
-                       )
-                       schedule();
+                       llist_empty(&ploop->llresubmit_pios) &&
+                       !ploop->force_md_writeback
+                       ) {
+
+                               if (kthread_should_stop()) {
+                                       
wait_event_interruptible(ploop->dispatcher_wq_data,
+                                                       
(!ploop_runners_have_pending(ploop)));
+                                       __set_current_state(TASK_RUNNING);
+                                       break;
+                               }
+                               schedule();
+                               /* now check for pending work */
+               }
 
                __set_current_state(TASK_RUNNING);
                do_ploop_run_work(ploop);
-               cond_resched();
+               cond_resched(); /* give other processes chance to run */
 #else
-               schedule();     // just do nothing yet
+               schedule();     /* just do nothing yet */
 #endif
+               if (kthread_should_stop()) {
+                       wait_event_interruptible(ploop->dispatcher_wq_data,
+                                               
(!ploop_runners_have_pending(ploop)));
+                       __set_current_state(TASK_RUNNING);
+                       break;
+               }
        }
        return 0;
 }
diff --git a/drivers/md/dm-ploop-target.c b/drivers/md/dm-ploop-target.c
index 772a5fd25c47..95ed3e26f2b8 100644
--- a/drivers/md/dm-ploop-target.c
+++ b/drivers/md/dm-ploop-target.c
@@ -170,8 +170,8 @@ static void ploop_destroy(struct ploop *ploop)
        }
 
        if (ploop->kt_worker) {
+               ploop->force_md_writeback = 1;
                wake_up_process(ploop->kt_worker->task);
-               // FIXME: a better way to wait for lists to drain
                /* try to send all pending - if we have partial io and enospc 
end bellow */
                while (!llist_empty(&ploop->pios[PLOOP_LIST_FLUSH]) ||
                        !llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]) ||
@@ -183,12 +183,25 @@ static void ploop_destroy(struct ploop *ploop)
                        smp_rmb(); /* */
                }
 
+               if (ploop->kt_runners) {
+                       for (i = 0; i < ploop->nkt_runners; i++) {
+                               if (ploop->kt_runners[i]) {
+                                       
wake_up_process(ploop->kt_runners[i]->task);
+                                       
kthread_stop(ploop->kt_runners[i]->task);
+                                       kfree(ploop->kt_runners[i]);
+                               }
+                       }
+               }
+
                kthread_stop(ploop->kt_worker->task);   /* waits for the thread 
to stop */
+
                WARN_ON(!llist_empty(&ploop->pios[PLOOP_LIST_PREPARE]));
                WARN_ON(!llist_empty(&ploop->llresubmit_pios));
-               // TODO: check if any pios left and end them with error
+               WARN_ON(!llist_empty(&ploop->enospc_pios));
+               kfree(ploop->kt_runners);
                kfree(ploop->kt_worker);
        }
+       WARN_ON_ONCE(ploop_has_pending_activity(ploop));
 
        for (i = 0; i < 2; i++)
                percpu_ref_exit(&ploop->inflight_bios_ref[i]);
@@ -355,7 +368,8 @@ ALLOW_ERROR_INJECTION(ploop_add_deltas_stack, ERRNO);
                argv++;                                         \
        } while (0);
 
-static struct ploop_worker *ploop_worker_create(struct ploop *ploop)
+static struct ploop_worker *ploop_worker_create(struct ploop *ploop,
+       int (*worker_fn)(void *), const char *pref, int id)
 {
        struct ploop_worker *worker;
        struct task_struct *task;
@@ -365,12 +379,13 @@ static struct ploop_worker *ploop_worker_create(struct 
ploop *ploop)
                return NULL;
 
        worker->ploop = ploop;
-       task = kthread_create(ploop_worker, worker, "ploop-%d-0",
-                               current->pid);
+       task = kthread_create(worker_fn, worker, "ploop-%d-%s-%d",
+                       current->pid, pref, id);
 
        if (IS_ERR(task))
                goto out_err;
        worker->task = task;
+       init_llist_head(&worker->work_llist);
 
        wake_up_process(task);
        // TODO: handle cgroups
@@ -543,10 +558,24 @@ static int ploop_ctr(struct dm_target *ti, unsigned int 
argc, char **argv)
                goto err;
 
 
-       ploop->kt_worker = ploop_worker_create(ploop);
+       init_waitqueue_head(&ploop->dispatcher_wq_data);
+
+       ploop->kt_worker = ploop_worker_create(ploop, ploop_worker, "d", 0);
        if (!ploop->kt_worker)
                goto err;
 
+/* make it a param = either module or cpu based or dev req queue */
+#define PLOOP_PIO_RUNNERS nr_cpu_ids
+       ploop->kt_runners = kcalloc(PLOOP_PIO_RUNNERS, sizeof(struct kt_worker 
*), GFP_KERNEL);
+       if (!ploop->kt_runners)
+               goto err;
+
+       ploop->nkt_runners = PLOOP_PIO_RUNNERS;
+       for (i=0; i < ploop->nkt_runners; i++) {
+               ploop->kt_runners[i] = ploop_worker_create(ploop, 
ploop_pio_runner, "r", i+1);
+               if (!ploop->kt_runners[i])
+                       goto err;
+       }
        ret = ploop_add_deltas_stack(ploop, &argv[0], argc);
        if (ret)
                goto err;
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index 02e47033f97c..f4f1449c82f3 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -148,14 +148,18 @@ enum {
 struct ploop_worker {
        struct ploop            *ploop;
        struct task_struct      *task;
-       u64                     kcov_handle;
+       struct llist_head       work_llist;
+#define PLOOP_WRK_BUSY         (1U<<0)
+       unsigned long           status;
+       atomic_t                inflight_pios;
 };
 
 struct ploop {
+       struct wait_queue_head dispatcher_wq_data;
        struct dm_target *ti;
 #define PLOOP_PRQ_POOL_SIZE 512 /* Twice nr_requests from blk_mq_init_sched() 
*/
        mempool_t *prq_pool;
-#define PLOOP_PIO_POOL_SIZE 256
+#define PLOOP_PIO_POOL_SIZE 512
        mempool_t *pio_pool;
 
        struct rb_root bat_entries;
@@ -201,7 +205,10 @@ struct ploop {
        struct work_struct worker;
        struct work_struct event_work;
 
-       struct ploop_worker *kt_worker;
+       struct ploop_worker *kt_worker;  /* dispatcher thread */
+       struct ploop_worker **kt_runners; /* pio runners */
+       unsigned int nkt_runners;
+       unsigned int last_used_runner;
        struct completion inflight_bios_ref_comp;
        struct percpu_ref inflight_bios_ref[2];
        bool inflight_ref_comp_pending;
@@ -611,6 +618,7 @@ extern void ploop_enospc_timer(struct timer_list *timer);
 extern loff_t ploop_llseek_hole(struct dm_target *ti, loff_t offset, int 
whence);
 
 extern int ploop_worker(void *data);
+extern int ploop_pio_runner(void *data);
 
 extern void ploop_disable_writeback_delay(struct ploop *ploop);
 extern void ploop_enable_writeback_delay(struct ploop *ploop);
-- 
2.43.0

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to