The commit is pushed to "branch-rh9-5.14.0-427.44.1.vz9.80.x-ovz" and will 
appear at g...@bitbucket.org:openvz/vzkernel.git
after rh9-5.14.0-427.44.1.vz9.80.6
------>
commit 050030db3438c01f105d4904a73d615a7dc124c8
Author: Alexander Atanasov <alexander.atana...@virtuozzo.com>
Date:   Fri Jan 24 17:36:29 2025 +0200

    dm-ploop: rework bat completion logic
    
    a pio may complete after md page update, in that case we
    must not complete the update but wait for the last data pio
    and only then complete them all.
    
    https://virtuozzo.atlassian.net/browse/VSTOR-91821
    Signed-off-by: Alexander Atanasov <alexander.atana...@virtuozzo.com>
    
    ======
    Patchset description:
    ploop: optimistations and scalling
    
    Ploop processes requsts in a different threads in parallel
    where possible which results in significant improvement in
    performance and makes further optimistations possible.
    
    Known bugs:
      - delayed metadata writeback is not working and is missing error handling
         - patch to disable it until fixed
      - fast path is not working - causes rcu lockups - patch to disable it
    
    Further improvements:
      - optimize md pages lookups
    
    Alexander Atanasov (50):
      dm-ploop: md_pages map all pages at creation time
      dm-ploop: Use READ_ONCE/WRITE_ONCE to access md page data
      dm-ploop: fsync after all pios are sent
      dm-ploop: move md status to use proper bitops
      dm-ploop: convert wait_list and wb_batch_llist to use lockless lists
      dm-ploop: convert enospc handling to use lockless lists
      dm-ploop: convert suspended_pios list to use lockless list
      dm-ploop: convert the rest of the lists to use llist variant
      dm-ploop: combine processing of pios thru prepare list and remove
        fsync worker
      dm-ploop: move from wq to kthread
      dm-ploop: move preparations of pios into the caller from worker
      dm-ploop: fast path execution for reads
      dm-ploop: do not use a wrapper for set_bit to make a page writeback
      dm-ploop: BAT use only one list for writeback
      dm-ploop: make md writeback timeout to be per page
      dm-ploop: add interface to disable bat writeback delay
      dm-ploop: convert wb_batch_list to lockless variant
      dm-ploop: convert high_prio to status
      dm-ploop: split cow processing into two functions
      dm-ploop: convert md page rw lock to spin lock
      dm-ploop: convert bat_rwlock to bat_lock spinlock
      dm-ploop: prepare bat updates under bat_lock
      dm-ploop: make ploop_bat_write_complete ready for parallel pio
        completion
      dm-ploop: make ploop_submit_metadata_writeback return number of
        requests sent
      dm-ploop: introduce pio runner threads
      dm-ploop: add pio list ids to be used when passing pios to runners
      dm-ploop: process pios via runners
      dm-ploop: disable metadata writeback delay
      dm-ploop: disable fast path
      dm-ploop: use lockless lists for chained cow updates list
      dm-ploop: use lockless lists for data ready pios
      dm-ploop: give runner threads better name
      dm-ploop: resize operation - add holes bitmap locking
      dm-ploop: remove unnecessary operations
      dm-ploop: use filp per thread
      dm-ploop: catch if we try to advance pio past bio end
      dm-ploop: support REQ_FUA for data pios
      dm-ploop: proplerly access nr_bat_entries
      dm-ploop: fix locking and improve error handling when submitting pios
      dm-ploop: fix how ENOTBLK is handled
      dm-ploop: sync when suspended or stopping
      dm-ploop: rework bat completion logic
      dm-ploop: rework logic in pio processing
      dm-ploop: end fsync pios in parallel
      dm-ploop: make filespace preallocations async
      dm-ploop: resubmit enospc pios from dispatcher thread
      dm-ploop: dm-ploop: simplify discard completion
      dm-ploop: use GFP_ATOMIC instead of GFP_NOIO
      dm-ploop: fix locks used in mixed context
      dm-ploop: fix how current flags are managed inside threads
    
    Andrey Zhadchenko (13):
      dm-ploop: do not flush after metadata writes
      dm-ploop: set IOCB_DSYNC on all FUA requests
      dm-ploop: remove extra ploop_cluster_is_in_top_delta()
      dm-ploop: introduce per-md page locking
      dm-ploop: reduce BAT accesses on discard completion
      dm-ploop: simplify llseek
      dm-ploop: speed up ploop_prepare_bat_update()
      dm-ploop: make new allocations immediately visible in BAT
      dm-ploop: drop ploop_cluster_is_in_top_delta()
      dm-ploop: do not wait for BAT update for non-FUA requests
      dm-ploop: add delay for metadata writeback
      dm-ploop: submit all postponed metadata on REQ_OP_FLUSH
      dm-ploop: handle REQ_PREFLUSH
    
    Feature: dm-ploop: ploop target driver
---
 drivers/md/dm-ploop-map.c | 105 ++++++++++++++++++++++++----------------------
 1 file changed, 54 insertions(+), 51 deletions(-)

diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index 7d5e2cf443ee..35d12fd54050 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -21,6 +21,8 @@
 #include "dm-rq.h"
 
 static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio);
+static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
+                                    blk_status_t bi_status);
 
 #define PREALLOC_SIZE (128ULL * 1024 * 1024)
 
@@ -900,6 +902,7 @@ static void ploop_advance_local_after_bat_wb(struct ploop 
*ploop,
        if (wait_llist_pending) {
                llist_for_each_safe(pos, t, wait_llist_pending) {
                        pio = list_entry((struct list_head *)pos, typeof(*pio), 
list);
+                       INIT_LIST_HEAD(&pio->list);
                        list_add(&pio->list, &list);
                }
        }
@@ -915,28 +918,31 @@ static void ploop_free_piwb(struct ploop_index_wb *piwb)
        kfree(piwb);
 }
 
+
+static void ploop_bat_write_finish(struct pio *pio, void *piwb_ptr,
+                                    blk_status_t bi_status);
 static void ploop_put_piwb(struct ploop_index_wb *piwb)
 {
        if (atomic_dec_and_test(&piwb->count)) {
-               struct ploop *ploop = piwb->ploop;
-               /*
-                * Index wb failed. Mark clusters as unallocated again.
-                * piwb->count is zero, so all data writers compeleted.
-                */
-               if (piwb->bi_status)
-                       ploop_advance_local_after_bat_wb(ploop, piwb, false);
+
+               ploop_bat_write_finish(piwb->pio, piwb, piwb->bi_status);
 
                if (piwb->comp) {
                        if (piwb->comp_bi_status)
                                *piwb->comp_bi_status = piwb->bi_status;
                        complete(piwb->comp);
                }
+               /*
+                * Status is set from first call to ploop_bat_write_complete
+                * zero keeps it as is
+                */
+
                ploop_free_piwb(piwb);
        }
 }
 
 /* This handler is called after BAT is updated. */
-static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
+static void ploop_bat_write_finish(struct pio *pio, void *piwb_ptr,
                                     blk_status_t bi_status)
 {
        struct ploop_index_wb *piwb = piwb_ptr;
@@ -950,30 +956,30 @@ static void ploop_bat_write_complete(struct pio *pio, 
void *piwb_ptr,
        struct llist_node *pos, *t;
        struct llist_node *ll_cow_pios;
        struct llist_node *ll_ready_pios;
-       int completed = atomic_read(&piwb->count) == 1;
-
-       if (completed) {
-               /* We are the last count so it is safe to advance bat */
-               if (!bi_status) {
-                       /*
-                        * Success: now update local BAT copy. We could do this
-                        * from our delayed work, but we want to publish new
-                        * mapping in the fastest way. This must be done before
-                        * data bios completion, since right after we complete
-                        * a bio, subsequent read wants to see written data
-                        * (ploop_map() wants to see not zero bat_entries[.]).
-                        */
-                       ploop_advance_local_after_bat_wb(ploop, piwb, true);
-               }
+
+       if (!bi_status) {
+               /*
+                * Success: now update local BAT copy. We could do this
+                * from our delayed work, but we want to publish new
+                * mapping in the fastest way. This must be done before
+                * data bios completion, since right after we complete
+                * a bio, subsequent read wants to see written data
+                * (ploop_map() wants to see not zero bat_entries[.]).
+                */
+               ploop_advance_local_after_bat_wb(ploop, piwb, true);
+       } else {
+               /*
+                * Index wb failed. Mark clusters as unallocated again.
+                * piwb->count is zero, so all data writers compeleted.
+                */
+               ploop_advance_local_after_bat_wb(ploop, piwb, false);
        }
 
        spin_lock_irqsave(&piwb->lock, flags);
-       if (completed)
-               piwb->completed = completed;
-       piwb->bi_status = bi_status;
-       ll_ready_pios = 
llist_reverse_order(llist_del_all(&piwb->llready_data_pios));
+       ll_ready_pios = llist_del_all(&piwb->llready_data_pios);
+       if (bi_status)
+               piwb->bi_status = bi_status;
        spin_unlock_irqrestore(&piwb->lock, flags);
-
        ll_cow_pios = llist_reverse_order(llist_del_all(&piwb->cow_llist));
 
        /*
@@ -981,8 +987,8 @@ static void ploop_bat_write_complete(struct pio *pio, void 
*piwb_ptr,
         */
 
        llist_for_each_safe(pos, t, ll_ready_pios) {
-               pio = list_entry((struct list_head *)pos, typeof(*aux_pio), 
list);
-               INIT_LIST_HEAD(&pio->list);
+               data_pio = list_entry((struct list_head *)pos, 
typeof(*data_pio), list);
+               INIT_LIST_HEAD(&data_pio->list);
                if (bi_status)
                        data_pio->bi_status = bi_status;
                ploop_pio_endio(data_pio);
@@ -1000,11 +1006,17 @@ static void ploop_bat_write_complete(struct pio *pio, 
void *piwb_ptr,
                        ploop_dispatch_pios(ploop, flush_pio, NULL);
                piwb->flush_pio = NULL;
        }
+}
+
+static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
+                                    blk_status_t bi_status)
+
+{
+       struct ploop_index_wb *piwb = piwb_ptr;
+
+       if (bi_status)
+               piwb->bi_status = bi_status;
 
-       /*
-        * In case of update BAT is failed, dst_clusters will be
-        * set back to holes_bitmap on last put_piwb().
-        */
        ploop_put_piwb(piwb);
 }
 
@@ -1032,7 +1044,6 @@ static int ploop_prepare_bat_update(struct ploop *ploop, 
struct md_page *md,
        if (!page || !pio)
                goto err;
        ploop_init_pio(ploop, REQ_OP_WRITE, pio);
-
        bat_entries = md->kmpage;
 
        spin_lock(&md->md_lock); /* write */
@@ -1260,23 +1271,19 @@ static int ploop_alloc_cluster(struct ploop *ploop, 
struct ploop_index_wb *piwb,
        return ret;
 }
 
-static bool ploop_data_pio_end(struct pio *pio)
+static void ploop_data_pio_end(struct pio *pio)
 {
        struct ploop_index_wb *piwb = pio->piwb;
        unsigned long flags;
-       bool completed;
 
        spin_lock_irqsave(&piwb->lock, flags);
-       completed = piwb->completed;
-       if (!completed)
-               llist_add((struct llist_node *)(&pio->list), 
&piwb->llready_data_pios);
-       else if (!pio->bi_status)
+       llist_add((struct llist_node *)(&pio->list), &piwb->llready_data_pios);
+       if (!pio->bi_status)
                pio->bi_status = piwb->bi_status;
-       spin_unlock_irqrestore(&piwb->lock, flags);
 
+       /* If pio is late then end it here. this can happen with flushes */
+       spin_unlock_irqrestore(&piwb->lock, flags);
        ploop_put_piwb(piwb);
-
-       return completed;
 }
 
 static void ploop_attach_end_action(struct pio *pio, struct ploop_index_wb 
*piwb)
@@ -1320,8 +1327,6 @@ static void ploop_check_standby_mode(struct ploop *ploop, 
long res)
 
 static void ploop_data_rw_complete(struct pio *pio)
 {
-       bool completed;
-
        if (pio->ret != pio->bi_iter.bi_size) {
                if (pio->ret >= 0 || pio->ret == -ENOTBLK) {
                        /* Partial IO or request to retry in buffered mode */
@@ -1356,12 +1361,10 @@ static void ploop_data_rw_complete(struct pio *pio)
        }
 check_da:
        if (pio->is_data_alloc) {
-               completed = ploop_data_pio_end(pio);
-               if (!completed)
-                       return;
+               ploop_data_pio_end(pio);
+       } else {
+               ploop_pio_endio(pio);
        }
-
-       ploop_pio_endio(pio);
 }
 
 /*
_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to