a pio may complete after md page update, in that case we must not complete the update but wait for the last data pio and only then complete them all.
https://virtuozzo.atlassian.net/browse/VSTOR-91821 Signed-off-by: Alexander Atanasov <alexander.atana...@virtuozzo.com> --- drivers/md/dm-ploop-map.c | 105 ++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 51 deletions(-) diff --git a/drivers/md/dm-ploop-map.c b/drivers/md/dm-ploop-map.c index 7d5e2cf443ee..35d12fd54050 100644 --- a/drivers/md/dm-ploop-map.c +++ b/drivers/md/dm-ploop-map.c @@ -21,6 +21,8 @@ #include "dm-rq.h" static inline int ploop_runners_add_work(struct ploop *ploop, struct pio *pio); +static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr, + blk_status_t bi_status); #define PREALLOC_SIZE (128ULL * 1024 * 1024) @@ -900,6 +902,7 @@ static void ploop_advance_local_after_bat_wb(struct ploop *ploop, if (wait_llist_pending) { llist_for_each_safe(pos, t, wait_llist_pending) { pio = list_entry((struct list_head *)pos, typeof(*pio), list); + INIT_LIST_HEAD(&pio->list); list_add(&pio->list, &list); } } @@ -915,28 +918,31 @@ static void ploop_free_piwb(struct ploop_index_wb *piwb) kfree(piwb); } + +static void ploop_bat_write_finish(struct pio *pio, void *piwb_ptr, + blk_status_t bi_status); static void ploop_put_piwb(struct ploop_index_wb *piwb) { if (atomic_dec_and_test(&piwb->count)) { - struct ploop *ploop = piwb->ploop; - /* - * Index wb failed. Mark clusters as unallocated again. - * piwb->count is zero, so all data writers compeleted. - */ - if (piwb->bi_status) - ploop_advance_local_after_bat_wb(ploop, piwb, false); + + ploop_bat_write_finish(piwb->pio, piwb, piwb->bi_status); if (piwb->comp) { if (piwb->comp_bi_status) *piwb->comp_bi_status = piwb->bi_status; complete(piwb->comp); } + /* + * Status is set from first call to ploop_bat_write_complete + * zero keeps it as is + */ + ploop_free_piwb(piwb); } } /* This handler is called after BAT is updated. */ -static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr, +static void ploop_bat_write_finish(struct pio *pio, void *piwb_ptr, blk_status_t bi_status) { struct ploop_index_wb *piwb = piwb_ptr; @@ -950,30 +956,30 @@ static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr, struct llist_node *pos, *t; struct llist_node *ll_cow_pios; struct llist_node *ll_ready_pios; - int completed = atomic_read(&piwb->count) == 1; - - if (completed) { - /* We are the last count so it is safe to advance bat */ - if (!bi_status) { - /* - * Success: now update local BAT copy. We could do this - * from our delayed work, but we want to publish new - * mapping in the fastest way. This must be done before - * data bios completion, since right after we complete - * a bio, subsequent read wants to see written data - * (ploop_map() wants to see not zero bat_entries[.]). - */ - ploop_advance_local_after_bat_wb(ploop, piwb, true); - } + + if (!bi_status) { + /* + * Success: now update local BAT copy. We could do this + * from our delayed work, but we want to publish new + * mapping in the fastest way. This must be done before + * data bios completion, since right after we complete + * a bio, subsequent read wants to see written data + * (ploop_map() wants to see not zero bat_entries[.]). + */ + ploop_advance_local_after_bat_wb(ploop, piwb, true); + } else { + /* + * Index wb failed. Mark clusters as unallocated again. + * piwb->count is zero, so all data writers compeleted. + */ + ploop_advance_local_after_bat_wb(ploop, piwb, false); } spin_lock_irqsave(&piwb->lock, flags); - if (completed) - piwb->completed = completed; - piwb->bi_status = bi_status; - ll_ready_pios = llist_reverse_order(llist_del_all(&piwb->llready_data_pios)); + ll_ready_pios = llist_del_all(&piwb->llready_data_pios); + if (bi_status) + piwb->bi_status = bi_status; spin_unlock_irqrestore(&piwb->lock, flags); - ll_cow_pios = llist_reverse_order(llist_del_all(&piwb->cow_llist)); /* @@ -981,8 +987,8 @@ static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr, */ llist_for_each_safe(pos, t, ll_ready_pios) { - pio = list_entry((struct list_head *)pos, typeof(*aux_pio), list); - INIT_LIST_HEAD(&pio->list); + data_pio = list_entry((struct list_head *)pos, typeof(*data_pio), list); + INIT_LIST_HEAD(&data_pio->list); if (bi_status) data_pio->bi_status = bi_status; ploop_pio_endio(data_pio); @@ -1000,11 +1006,17 @@ static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr, ploop_dispatch_pios(ploop, flush_pio, NULL); piwb->flush_pio = NULL; } +} + +static void ploop_bat_write_complete(struct pio *pio, void *piwb_ptr, + blk_status_t bi_status) + +{ + struct ploop_index_wb *piwb = piwb_ptr; + + if (bi_status) + piwb->bi_status = bi_status; - /* - * In case of update BAT is failed, dst_clusters will be - * set back to holes_bitmap on last put_piwb(). - */ ploop_put_piwb(piwb); } @@ -1032,7 +1044,6 @@ static int ploop_prepare_bat_update(struct ploop *ploop, struct md_page *md, if (!page || !pio) goto err; ploop_init_pio(ploop, REQ_OP_WRITE, pio); - bat_entries = md->kmpage; spin_lock(&md->md_lock); /* write */ @@ -1260,23 +1271,19 @@ static int ploop_alloc_cluster(struct ploop *ploop, struct ploop_index_wb *piwb, return ret; } -static bool ploop_data_pio_end(struct pio *pio) +static void ploop_data_pio_end(struct pio *pio) { struct ploop_index_wb *piwb = pio->piwb; unsigned long flags; - bool completed; spin_lock_irqsave(&piwb->lock, flags); - completed = piwb->completed; - if (!completed) - llist_add((struct llist_node *)(&pio->list), &piwb->llready_data_pios); - else if (!pio->bi_status) + llist_add((struct llist_node *)(&pio->list), &piwb->llready_data_pios); + if (!pio->bi_status) pio->bi_status = piwb->bi_status; - spin_unlock_irqrestore(&piwb->lock, flags); + /* If pio is late then end it here. this can happen with flushes */ + spin_unlock_irqrestore(&piwb->lock, flags); ploop_put_piwb(piwb); - - return completed; } static void ploop_attach_end_action(struct pio *pio, struct ploop_index_wb *piwb) @@ -1320,8 +1327,6 @@ static void ploop_check_standby_mode(struct ploop *ploop, long res) static void ploop_data_rw_complete(struct pio *pio) { - bool completed; - if (pio->ret != pio->bi_iter.bi_size) { if (pio->ret >= 0 || pio->ret == -ENOTBLK) { /* Partial IO or request to retry in buffered mode */ @@ -1356,12 +1361,10 @@ static void ploop_data_rw_complete(struct pio *pio) } check_da: if (pio->is_data_alloc) { - completed = ploop_data_pio_end(pio); - if (!completed) - return; + ploop_data_pio_end(pio); + } else { + ploop_pio_endio(pio); } - - ploop_pio_endio(pio); } /* -- 2.43.0 _______________________________________________ Devel mailing list Devel@openvz.org https://lists.openvz.org/mailman/listinfo/devel