From: Andrey Zhadchenko <andrey.zhadche...@virtuozzo.com>

Currently we have single bat_rwlock for the whole ploop. However,
runtime locking granularity can be reduced to single metadata page.
In this patch, add rwlock to metadata structure, use it when
accessing md->levels and md->page at the sime time to protect
readers against writers.

https://virtuozzo.atlassian.net/browse/VSTOR-91817
Signed-off-by: Andrey Zhadchenko <andrey.zhadche...@virtuozzo.com>
---
 drivers/md/dm-ploop-bat.c | 14 +++++++-----
 drivers/md/dm-ploop-cmd.c | 48 ++++++++++++++++++++++++---------------
 drivers/md/dm-ploop-map.c | 16 ++++++-------
 drivers/md/dm-ploop.h     |  6 +++++
 4 files changed, 52 insertions(+), 32 deletions(-)

diff --git a/drivers/md/dm-ploop-bat.c b/drivers/md/dm-ploop-bat.c
index 655d0e4c91ab..a6202720927f 100644
--- a/drivers/md/dm-ploop-bat.c
+++ b/drivers/md/dm-ploop-bat.c
@@ -88,6 +88,7 @@ static struct md_page *ploop_alloc_md_page(u32 id)
        md->page = page;
        md->kmpage = kmap(page);
        md->id = id;
+       rwlock_init(&md->lock);
        return md;
 err_page:
        kfree(levels);
@@ -134,20 +135,23 @@ bool ploop_try_update_bat_entry(struct ploop *ploop, u32 
clu, u8 level, u32 dst_
 {
        u32 *bat_entries, id = ploop_bat_clu_to_page_nr(clu);
        struct md_page *md = ploop_md_page_find(ploop, id);
-
-       lockdep_assert_held(&ploop->bat_rwlock);
+       unsigned long flags;
+       bool ret = false;
 
        if (!md)
                return false;
 
        clu = ploop_bat_clu_idx_in_page(clu); /* relative offset */
 
+       write_lock_irqsave(&md->lock, flags);
        if (READ_ONCE(md->bat_levels[clu]) == level) {
                bat_entries = md->kmpage;
                WRITE_ONCE(bat_entries[clu], dst_clu);
-               return true;
+               ret = true;
        }
-       return false;
+       write_unlock_irqrestore(&md->lock, flags);
+
+       return ret;
 }
 
 /* Alloc holes_bitmap and set bits of free clusters */
@@ -411,7 +415,6 @@ static void ploop_apply_delta_mappings(struct ploop *ploop,
        if (!is_raw)
                d_md = ploop_md_first_entry(md_root);
 
-       write_lock_irq(&ploop->bat_rwlock);
        ploop_for_each_md_page(ploop, md, node) {
                bat_entries = md->kmpage;
                if (!is_raw)
@@ -455,7 +458,6 @@ static void ploop_apply_delta_mappings(struct ploop *ploop,
                if (!is_raw)
                        d_md = ploop_md_next_entry(d_md);
        }
-       write_unlock_irq(&ploop->bat_rwlock);
 }
 
 int ploop_check_delta_length(struct ploop *ploop, struct file *file, loff_t 
*file_size)
diff --git a/drivers/md/dm-ploop-cmd.c b/drivers/md/dm-ploop-cmd.c
index 115efdf092d3..ac477be0d2eb 100644
--- a/drivers/md/dm-ploop-cmd.c
+++ b/drivers/md/dm-ploop-cmd.c
@@ -27,6 +27,7 @@ static void ploop_advance_holes_bitmap(struct ploop *ploop,
        u32 i, end, size, dst_clu, *bat_entries;
        struct rb_node *node;
        struct md_page *md;
+       unsigned long flags;
 
        /* This is called only once */
        if (cmd->resize.stage != PLOOP_GROW_STAGE_INITIAL)
@@ -44,6 +45,8 @@ static void ploop_advance_holes_bitmap(struct ploop *ploop,
        ploop_for_each_md_page(ploop, md, node) {
                ploop_init_be_iter(ploop, md->id, &i, &end);
                bat_entries = md->kmpage;
+
+               read_lock_irqsave(&md->lock, flags);
                for (; i <= end; i++) {
                        if (!ploop_md_page_cluster_is_in_top_delta(ploop, md, 
i))
                                continue;
@@ -54,6 +57,7 @@ static void ploop_advance_holes_bitmap(struct ploop *ploop,
                                ploop_hole_clear_bit(dst_clu, ploop);
                        }
                }
+               read_unlock_irqrestore(&md->lock, flags);
        }
        write_unlock_irq(&ploop->bat_rwlock);
 }
@@ -157,11 +161,13 @@ static u32 ploop_find_bat_entry(struct ploop *ploop, u32 
dst_clu, bool *is_locke
        u32 i, end, *bat_entries, clu = U32_MAX;
        struct rb_node *node;
        struct md_page *md;
+       unsigned long flags;
 
-       read_lock_irq(&ploop->bat_rwlock);
        ploop_for_each_md_page(ploop, md, node) {
                ploop_init_be_iter(ploop, md->id, &i, &end);
                bat_entries = md->kmpage;
+
+               read_lock_irqsave(&md->lock, flags);
                for (; i <= end; i++) {
                        if (READ_ONCE(bat_entries[i]) != dst_clu)
                                continue;
@@ -170,10 +176,10 @@ static u32 ploop_find_bat_entry(struct ploop *ploop, u32 
dst_clu, bool *is_locke
                                break;
                        }
                }
+               read_unlock_irqrestore(&md->lock, flags);
                if (clu != UINT_MAX)
                        break;
        }
-       read_unlock_irq(&ploop->bat_rwlock);
 
        *is_locked = false;
        if (clu != UINT_MAX) {
@@ -342,10 +348,8 @@ static int ploop_grow_relocate_cluster(struct ploop *ploop,
        }
 
        /* Update local BAT copy */
-       write_lock_irq(&ploop->bat_rwlock);
        WARN_ON(!ploop_try_update_bat_entry(ploop, clu,
                        ploop_top_level(ploop), new_dst));
-       write_unlock_irq(&ploop->bat_rwlock);
 not_occupied:
        /*
         * Now dst_clu is not referenced in BAT, so increase the value
@@ -695,12 +699,10 @@ static int ploop_merge_latest_snapshot(struct ploop 
*ploop)
        if (ret)
                goto out;
 
-       write_lock_irq(&ploop->bat_rwlock);
        level = ploop->nr_deltas - 2;
        file = ploop->deltas[level].file;
        ploop->deltas[level] = ploop->deltas[level + 1];
        ploop->nr_deltas--;
-       write_unlock_irq(&ploop->bat_rwlock);
        fput(file);
 
        ploop_resume_submitting_pios(ploop);
@@ -718,15 +720,19 @@ static void notify_delta_merged(struct ploop *ploop, u8 
level,
        struct rb_node *node;
        struct file *file;
        bool stop = false;
+       unsigned long flags;
        u32 clu;
 
        d_md = ploop_md_first_entry(md_root);
 
-       write_lock_irq(&ploop->bat_rwlock);
        ploop_for_each_md_page(ploop, md, node) {
                init_be_iter(nr_be, md->id, &i, &end);
                bat_entries = md->kmpage;
                d_bat_entries = d_md->kmpage;
+
+               write_lock_irqsave(&md->lock, flags);
+               write_lock(&d_md->lock);
+
                for (; i <= end; i++) {
                        clu = ploop_page_clu_idx_to_bat_clu(md->id, i);
                        if (clu == nr_be - 1)
@@ -759,6 +765,10 @@ static void notify_delta_merged(struct ploop *ploop, u8 
level,
                        else
                                WRITE_ONCE(md->bat_levels[i], level);
                }
+
+               write_unlock(&d_md->lock);
+               write_unlock_irqrestore(&md->lock, flags);
+
                if (stop)
                        break;
                d_md = ploop_md_next_entry(d_md);
@@ -770,7 +780,6 @@ static void notify_delta_merged(struct ploop *ploop, u8 
level,
                ploop->deltas[i - 1] = ploop->deltas[i];
        memset(&ploop->deltas[--ploop->nr_deltas], 0,
               sizeof(struct ploop_delta));
-       write_unlock_irq(&ploop->bat_rwlock);
        fput(file);
 }
 
@@ -781,7 +790,6 @@ static int ploop_process_update_delta_index(struct ploop 
*ploop, u8 level,
        u32 clu, dst_clu, n;
        int ret;
 
-       write_lock_irq(&ploop->bat_rwlock);
        /* Check all */
        while (sscanf(map, "%u:%u;%n", &clu, &dst_clu, &n) == 2) {
                /*
@@ -806,7 +814,6 @@ static int ploop_process_update_delta_index(struct ploop 
*ploop, u8 level,
        }
        ret = 0;
 unlock:
-       write_unlock_irq(&ploop->bat_rwlock);
        return ret;
 }
 ALLOW_ERROR_INJECTION(ploop_process_update_delta_index, ERRNO);
@@ -897,12 +904,9 @@ static int ploop_get_delta_name_cmd(struct ploop *ploop, 
u8 level,
 
        /*
         * Nobody can change deltas in parallel, since
-        * another cmds are prohibited, but do this
-        * for uniformity.
+        * another cmds are prohibited
         */
-       read_lock_irq(&ploop->bat_rwlock);
        file = get_file(ploop->deltas[level].file);
-       read_unlock_irq(&ploop->bat_rwlock);
 
        p = file_path(file, result, maxlen);
        if (p == ERR_PTR(-ENAMETOOLONG)) {
@@ -970,7 +974,11 @@ static int process_flip_upper_deltas(struct ploop *ploop)
         bat_clusters = DIV_ROUND_UP(size, CLU_SIZE(ploop));
        hb_nr = ploop->hb_nr;
 
-       write_lock_irq(&ploop->bat_rwlock);
+       /*
+        * We can be here only if ploop is suspended:
+        * no other IO nor command is possible
+        */
+
        /* Prepare holes_bitmap */
        memset(holes_bitmap, 0xff, hb_nr/8);
        for (i = (hb_nr & ~0x7); i < hb_nr; i++)
@@ -982,6 +990,7 @@ static int process_flip_upper_deltas(struct ploop *ploop)
        ploop_for_each_md_page(ploop, md, node) {
                ploop_init_be_iter(ploop, md->id, &i, &end);
                bat_entries = md->kmpage;
+
                for (; i <= end; i++) {
                        if (READ_ONCE(bat_entries[i]) == BAT_ENTRY_NONE)
                                continue;
@@ -996,7 +1005,6 @@ static int process_flip_upper_deltas(struct ploop *ploop)
 
        /* FIXME */
        swap(ploop->deltas[level], ploop->deltas[level+1]);
-       write_unlock_irq(&ploop->bat_rwlock);
        return 0;
 }
 
@@ -1035,6 +1043,7 @@ static int ploop_check_delta_before_flip(struct ploop 
*ploop, struct file *file)
        struct md_page *md, *d_md;
        struct rb_node *node;
        bool stop = false;
+       unsigned long flags;
 
        ret = ploop_read_delta_metadata(ploop, file, &md_root, &nr_be);
        if (ret) {
@@ -1045,10 +1054,12 @@ static int ploop_check_delta_before_flip(struct ploop 
*ploop, struct file *file)
        /* Points to hdr since md_page[0] also contains hdr. */
        d_md = ploop_md_first_entry(&md_root);
 
-       write_lock_irq(&ploop->bat_rwlock);
        ploop_for_each_md_page(ploop, md, node) {
                init_be_iter(nr_be, md->id, &i, &end);
                d_bat_entries = d_md->kmpage;
+
+               read_lock_irqsave(&md->lock, flags);
+               read_lock(&d_md->lock);
                for (; i <= end; i++) {
                        if (ploop_md_page_cluster_is_in_top_delta(ploop, md, i) 
&&
                            d_bat_entries[i] != BAT_ENTRY_NONE) {
@@ -1057,6 +1068,8 @@ static int ploop_check_delta_before_flip(struct ploop 
*ploop, struct file *file)
                                goto unmap;
                        }
                }
+               read_unlock(&d_md->lock);
+               read_unlock_irqrestore(&md->lock, flags);
 
                clu = ploop_page_clu_idx_to_bat_clu(md->id, i);
                if (clu == nr_be - 1) {
@@ -1069,7 +1082,6 @@ static int ploop_check_delta_before_flip(struct ploop 
*ploop, struct file *file)
                d_md = ploop_md_next_entry(d_md);
        }
 
-       write_unlock_irq(&ploop->bat_rwlock);
        ploop_free_md_pages_tree(&md_root);
 out:
 #endif
diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c
index 75a2075ea58a..31ed8050008f 100644
--- a/drivers/md/dm-ploop-map.c
+++ b/drivers/md/dm-ploop-map.c
@@ -386,13 +386,13 @@ static bool ploop_delay_if_md_busy(struct ploop *ploop, 
struct md_page *md,
        WARN_ON_ONCE(!list_empty(&pio->list));
 
        /* lock protects piwb */
-       read_lock_irqsave(&ploop->bat_rwlock, flags);
+       read_lock_irqsave(&md->lock, flags);
        piwb = md->piwb;
        if (piwb && (piwb->type != type || test_bit(MD_WRITEBACK, 
&md->status))) {
                llist_add((struct llist_node *)(&pio->list), &md->wait_llist);
                busy = true;
        }
-       read_unlock_irqrestore(&ploop->bat_rwlock, flags);
+       read_unlock_irqrestore(&md->lock, flags);
 
        return busy;
 }
@@ -802,9 +802,9 @@ static void ploop_advance_local_after_bat_wb(struct ploop 
*ploop,
        WARN_ON_ONCE(!test_bit(MD_WRITEBACK, &md->status));
        clear_bit(MD_WRITEBACK, &md->status);
        /* protect piwb */
-       write_lock_irqsave(&ploop->bat_rwlock, flags);
+       write_lock_irqsave(&md->lock, flags);
        md->piwb = NULL;
-       write_unlock_irqrestore(&ploop->bat_rwlock, flags);
+       write_unlock_irqrestore(&md->lock, flags);
 
        wait_llist_pending = llist_del_all(&md->wait_llist);
        if (wait_llist_pending) {
@@ -921,10 +921,10 @@ static int ploop_prepare_bat_update(struct ploop *ploop, 
struct md_page *md,
 
        bat_entries = md->kmpage;
 
-       write_lock_irq(&ploop->bat_rwlock);
+       write_lock_irq(&md->lock);
        md->piwb = piwb;
        piwb->md = md;
-       write_unlock_irq(&ploop->bat_rwlock);
+       write_unlock_irq(&md->lock);
 
        piwb->page_id = page_id;
        to = piwb->kmpage;
@@ -968,10 +968,10 @@ void ploop_break_bat_update(struct ploop *ploop, struct 
md_page *md)
        struct ploop_index_wb *piwb;
        unsigned long flags;
 
-       write_lock_irqsave(&ploop->bat_rwlock, flags);
+       write_lock_irqsave(&md->lock, flags);
        piwb = md->piwb;
        md->piwb = NULL;
-       write_unlock_irqrestore(&ploop->bat_rwlock, flags);
+       write_unlock_irqrestore(&md->lock, flags);
 
        ploop_free_piwb(piwb);
 }
diff --git a/drivers/md/dm-ploop.h b/drivers/md/dm-ploop.h
index a5eab5bad596..2fab968b12bf 100644
--- a/drivers/md/dm-ploop.h
+++ b/drivers/md/dm-ploop.h
@@ -125,6 +125,8 @@ struct md_page {
 
        struct llist_node wb_llink;
        struct ploop_index_wb *piwb;
+
+       rwlock_t lock;
 };
 
 enum {
@@ -425,6 +427,7 @@ static inline u32 ploop_bat_entries(struct ploop *ploop, 
u32 clu,
 {
        u32 *bat_entries, dst_clu, id;
        struct md_page *md;
+       unsigned long flags;
 
        id = ploop_bat_clu_to_page_nr(clu);
        md = ploop_md_page_find(ploop, id);
@@ -433,6 +436,7 @@ static inline u32 ploop_bat_entries(struct ploop *ploop, 
u32 clu,
        /* Cluster index related to the page[page_id] start */
        clu = ploop_bat_clu_idx_in_page(clu);
 
+       read_lock_irqsave(&md->lock, flags);
        if (bat_level)
                *bat_level = READ_ONCE(md->bat_levels[clu]);
        if (md_ret)
@@ -440,6 +444,8 @@ static inline u32 ploop_bat_entries(struct ploop *ploop, 
u32 clu,
 
        bat_entries = md->kmpage;
        dst_clu = READ_ONCE(bat_entries[clu]);
+       read_unlock_irqrestore(&md->lock, flags);
+
        return dst_clu;
 }
 
-- 
2.43.0

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to