On 12/5/24 22:56, Alexander Atanasov wrote:
a pio may complete after md page update, in that case we
must not complete the update but wait for the last data pio
and only then complete them all.

https://virtuozzo.atlassian.net/browse/VSTOR-91821
Signed-off-by: Alexander Atanasov <alexander.atana...@virtuozzo.com>
---
  drivers/md/dm-ploop-map.c | 57 ++++++++++++++++++++++++++-------------
  drivers/md/dm-ploop.h     |  1 +
  2 files changed, 39 insertions(+), 19 deletions(-)

diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index 482022d6b60b..9da996935945 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -598,7 +598,6 @@ static void ploop_unlink_completed_pio(struct ploop *ploop, 
struct pio *pio)
static bool ploop_md_make_dirty(struct ploop *ploop, struct md_page *md)
  {
-       unsigned long flags;
        bool new = false;
/*
@@ -896,6 +895,7 @@ static void ploop_advance_local_after_bat_wb(struct ploop 
*ploop,
        if (wait_llist_pending) {
                llist_for_each_safe(pos, t, wait_llist_pending) {
                        pio = list_entry((struct list_head *)pos, typeof(*pio), 
list);
+                       INIT_LIST_HEAD(&pio->list);
                        list_add(&pio->list, &list);
                }
        }
@@ -962,15 +962,22 @@ static void ploop_bat_write_complete(struct pio *pio, 
void *piwb_ptr,
                         */
                        ploop_advance_local_after_bat_wb(ploop, piwb, true);
                }
+       } else {
+               PL_ERR("piwb not completed\n");
        }
spin_lock_irqsave(&piwb->lock, flags);
-       if (completed)
+       if (completed) {
                piwb->completed = completed;
-       piwb->bi_status = bi_status;
-       ll_ready_pios = 
llist_reverse_order(llist_del_all(&piwb->llready_data_pios));
+               ll_ready_pios = llist_del_all(&piwb->llready_data_pios);
+       } else {
+               piwb->wait_for_last = 1;
+       }
+       if (bi_status)
+               piwb->bi_status = bi_status;
        spin_unlock_irqrestore(&piwb->lock, flags);
-
+       if (!completed)
+               return;
        ll_cow_pios = llist_reverse_order(llist_del_all(&piwb->cow_llist));
/*
@@ -978,8 +985,8 @@ static void ploop_bat_write_complete(struct pio *pio, void 
*piwb_ptr,
         */
llist_for_each_safe(pos, t, ll_ready_pios) {
-               pio = list_entry((struct list_head *)pos, typeof(*aux_pio), 
list);
-               INIT_LIST_HEAD(&pio->list);
+               data_pio = list_entry((struct list_head *)pos, 
typeof(*data_pio), list);
+               INIT_LIST_HEAD(&data_pio->list);
                if (bi_status)
                        data_pio->bi_status = bi_status;
                ploop_pio_endio(data_pio);
@@ -1026,9 +1033,10 @@ static int ploop_prepare_bat_update(struct ploop *ploop, 
struct md_page *md,
        piwb->pio = pio = ploop_alloc_pio(ploop, GFP_NOIO);
        if (!page || !pio)
                goto err;
-       piwb->kmpage = kmap(piwb->bat_page);
        ploop_init_pio(ploop, REQ_OP_WRITE, pio);
-
+       piwb->kmpage = kmap(piwb->bat_page);
+       WARN_ON(!piwb->kmpage);
+       piwb->wait_for_last = 0;
        bat_entries = md->kmpage;
spin_lock_irq(&md->md_lock);
@@ -1251,6 +1259,7 @@ static bool ploop_data_pio_end(struct pio *pio)
        struct ploop_index_wb *piwb = pio->piwb;
        unsigned long flags;
        bool completed;
+       int last_pio = 0;
spin_lock_irqsave(&piwb->lock, flags);
        completed = piwb->completed;
@@ -1258,12 +1267,26 @@ static bool ploop_data_pio_end(struct pio *pio)
                llist_add((struct llist_node *)(&pio->list), 
&piwb->llready_data_pios);
        else if (!pio->bi_status)
                pio->bi_status = piwb->bi_status;
-       spin_unlock_irqrestore(&piwb->lock, flags);
        /* If pio is late then end it here. this can happen with flushes */
-       if (completed)
+       if (atomic_read(&piwb->count) == 2) {

I think this is really complicated. Why not just call ploop_bat_write_complete() when count reaches zero (and dec count on data pio end or metadata pio end). In current scheme I think we will explode if this code is run in-parallel. Imagine md write is complete, but several data pios are ended simultaneously and execute this in-parallel. They check that count is not 2 (for example there is 3 parallel datapio). Now all of them have last_pio == 0. Then all 3 go ploop_put_piwb(). piwb count is now zero and no one has called ploop_bat_write_complete(). piwb->lock didn't help us since there is still may be parallel execution between unlock and piwb->count dec inside ploop_put_piwb().

But maybe I am missing something

+               if (piwb->wait_for_last)
+                       last_pio = 1;
+       }
+       spin_unlock_irqrestore(&piwb->lock, flags);
+       ploop_put_piwb(piwb);
+       if (completed) {
+               struct ploop *ploop = pio->ploop;
+
+               PL_ERR("late pio already completed\n");
                ploop_pio_endio(pio);
+       } else if (last_pio) {
+               /*
+                * Status is set from first call to ploop_bat_write_complete
+                * zero keeps it as is
+                */
+               ploop_bat_write_complete(piwb->pio, piwb, piwb->bi_status);
+       }
- ploop_put_piwb(piwb); return completed;
  }
@@ -1309,8 +1332,6 @@ static void ploop_check_standby_mode(struct ploop *ploop, 
long res)
static void ploop_data_rw_complete(struct pio *pio)
  {
-       bool completed;
-
        if (pio->ret != pio->bi_iter.bi_size) {
                if (pio->ret >= 0 || pio->ret == -ENOTBLK) {
                        /* Partial IO or request to retry in buffered mode */
@@ -1338,12 +1359,10 @@ static void ploop_data_rw_complete(struct pio *pio)
                pio->bi_status = errno_to_blk_status(pio->ret);
        }
        if (pio->is_data_alloc) {
-               completed = ploop_data_pio_end(pio);
-               if (!completed)
-                       return;
+               ploop_data_pio_end(pio);
+       } else {
+               ploop_pio_endio(pio);
        }
-
-       ploop_pio_endio(pio);
  }
/*
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index 5ad2a5142521..cc66f9dcd27f 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -111,6 +111,7 @@ struct ploop_index_wb {
        u32 page_id;
        struct bio_vec aux_bvec;
        struct pio *flush_pio;
+       int wait_for_last;
  };
/* Metadata page */
_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to