On 20.12.24 16:23, Andrey Zhadchenko wrote:


On 12/5/24 22:56, Alexander Atanasov wrote:
a pio may complete after md page update, in that case we
must not complete the update but wait for the last data pio
and only then complete them all.

https://virtuozzo.atlassian.net/browse/VSTOR-91821
Signed-off-by: Alexander Atanasov <alexander.atana...@virtuozzo.com>
---
  drivers/md/dm-ploop-map.c | 57 ++++++++++++++++++++++++++-------------
  drivers/md/dm-ploop.h     |  1 +
  2 files changed, 39 insertions(+), 19 deletions(-)

diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index 482022d6b60b..9da996935945 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -598,7 +598,6 @@ static void ploop_unlink_completed_pio(struct ploop *ploop, struct pio *pio)   static bool ploop_md_make_dirty(struct ploop *ploop, struct md_page *md)
  {
-    unsigned long flags;
      bool new = false;
      /*
@@ -896,6 +895,7 @@ static void ploop_advance_local_after_bat_wb(struct ploop *ploop,
      if (wait_llist_pending) {
          llist_for_each_safe(pos, t, wait_llist_pending) {
              pio = list_entry((struct list_head *)pos, typeof(*pio), list);
+            INIT_LIST_HEAD(&pio->list);
              list_add(&pio->list, &list);
          }
      }
@@ -962,15 +962,22 @@ static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
               */
              ploop_advance_local_after_bat_wb(ploop, piwb, true);
          }
+    } else {
+        PL_ERR("piwb not completed\n");
      }
      spin_lock_irqsave(&piwb->lock, flags);
-    if (completed)
+    if (completed) {
          piwb->completed = completed;
-    piwb->bi_status = bi_status;
-    ll_ready_pios = llist_reverse_order(llist_del_all(&piwb->llready_data_pios));
+        ll_ready_pios = llist_del_all(&piwb->llready_data_pios);
+    } else {
+        piwb->wait_for_last = 1;
+    }
+    if (bi_status)
+        piwb->bi_status = bi_status;
      spin_unlock_irqrestore(&piwb->lock, flags);
-
+    if (!completed)
+        return;
      ll_cow_pios = llist_reverse_order(llist_del_all(&piwb->cow_llist));
      /*
@@ -978,8 +985,8 @@ static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr,
       */
      llist_for_each_safe(pos, t, ll_ready_pios) {
-        pio = list_entry((struct list_head *)pos, typeof(*aux_pio), list);
-        INIT_LIST_HEAD(&pio->list);
+        data_pio = list_entry((struct list_head *)pos, typeof(*data_pio), list);
+        INIT_LIST_HEAD(&data_pio->list);
          if (bi_status)
              data_pio->bi_status = bi_status;
          ploop_pio_endio(data_pio);
@@ -1026,9 +1033,10 @@ static int ploop_prepare_bat_update(struct ploop *ploop, struct md_page *md,
      piwb->pio = pio = ploop_alloc_pio(ploop, GFP_NOIO);
      if (!page || !pio)
          goto err;
-    piwb->kmpage = kmap(piwb->bat_page);
      ploop_init_pio(ploop, REQ_OP_WRITE, pio);
-
+    piwb->kmpage = kmap(piwb->bat_page);
+    WARN_ON(!piwb->kmpage);
+    piwb->wait_for_last = 0;
      bat_entries = md->kmpage;
      spin_lock_irq(&md->md_lock);
@@ -1251,6 +1259,7 @@ static bool ploop_data_pio_end(struct pio *pio)
      struct ploop_index_wb *piwb = pio->piwb;
      unsigned long flags;
      bool completed;
+    int last_pio = 0;
      spin_lock_irqsave(&piwb->lock, flags);
      completed = piwb->completed;
@@ -1258,12 +1267,26 @@ static bool ploop_data_pio_end(struct pio *pio)
          llist_add((struct llist_node *)(&pio->list), &piwb->llready_data_pios);
      else if (!pio->bi_status)
          pio->bi_status = piwb->bi_status;
-    spin_unlock_irqrestore(&piwb->lock, flags);
      /* If pio is late then end it here. this can happen with flushes */
-    if (completed)
+    if (atomic_read(&piwb->count) == 2) {

I think this is really complicated. Why not just call ploop_bat_write_complete() when count reaches zero (and dec count on data pio end or metadata pio end). In current scheme I think we will explode if this code is run in-parallel. Imagine md write is complete, but several data pios are ended simultaneously and execute this in-parallel. They check that count is not 2 (for example there is 3 parallel datapio). Now all of them have last_pio == 0. Then all 3 go ploop_put_piwb(). piwb count is now zero and no one has called ploop_bat_write_complete(). piwb->lock didn't help us since there is still may be parallel execution between unlock and piwb->count dec inside ploop_put_piwb().


Count reaches zero in put piwb and piwb is freed, so you mean count 1.
ploop_bat_write_complete is complete if atomic_read(&piwb->count) == 1;
i.e. it is the last reference to piwb which is dropped on put.

We can not declare bat write complete and end related pios if
all pios are not complete. But we can get a call to ploop_bat_write_complete before call to the last call to pio which case the code tries to catch. What tries to synchronize what you talk about is piwb->wait_for_last - it is set and checked under a lock. It is set only if bat_write_complete is already called. So it needs to be called from ploop_data_pio_end. But may be you are right, and as a solution i see to check again if ploop_bat_write_complete needs to be called in put_pwib.

I have to think about this.

--
Regards,
Alexander Atanasov

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to