On 02/04/2016 02:08 PM, Javier González wrote:
> Flash controllers typically define flash pages as a collection of flash
> sectors of typically 4K. Moreover, flash controllers might program flash
> pages across several planes. This defines the write granurality at which
> flash can be programmed. This is different for each flash controller. In
> rrpc, the assumption has been that flash pages are 4K, thus writes could
> be sent out to the media as they were received.
> 
> This patch adds a per-block write buffer to rrpc. Writes are flushed to
> the media in chuncks of the minimum granurality allowed by the
> controller, though the strategy can be changed to, for example, only
> flush at the maximum supported (64 pages in nvme). Apart from enabling
> the use of rrpc in real hardware, this buffering strategy will be used
> for recovery; if a block becomes bad, a new block can be allocated to
> persist valid data.

Great work. The code is nearly 2x'ed in size. Can you add a detailed
explanation on the inner workings of the write buffering to help new
users to understand its logic?

> 
> Signed-off-by: Javier González <jav...@cnexlabs.com>
> ---
>  drivers/lightnvm/rrpc.c  | 851 
> ++++++++++++++++++++++++++++++++++++++---------
>  drivers/lightnvm/rrpc.h  |  82 ++++-
>  include/linux/lightnvm.h |   6 +-
>  3 files changed, 771 insertions(+), 168 deletions(-)
> 
> diff --git a/drivers/lightnvm/rrpc.c b/drivers/lightnvm/rrpc.c
> index 8187bf3..e9fb19d 100644
> --- a/drivers/lightnvm/rrpc.c
> +++ b/drivers/lightnvm/rrpc.c
> @@ -16,11 +16,12 @@
>  
>  #include "rrpc.h"
>  
> -static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache;
> +static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache, *rrpc_rrq_cache,
> +                     *rrpc_buf_rrq_cache, *rrpc_wb_cache, *rrpc_block_cache;
>  static DECLARE_RWSEM(rrpc_lock);
>  
>  static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> -                             struct nvm_rq *rqd, unsigned long flags);
> +                             struct rrpc_rq *rqd, unsigned long flags);
>  
>  #define rrpc_for_each_lun(rrpc, rlun, i) \
>               for ((i) = 0, rlun = &(rrpc)->luns[0]; \
> @@ -62,53 +63,59 @@ static void rrpc_invalidate_range(struct rrpc *rrpc, 
> sector_t slba,
>       spin_unlock(&rrpc->rev_lock);
>  }
>  
> -static struct nvm_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
> +static void rrpc_release_and_free_rrqd(struct kref *ref)
> +{
> +     struct rrpc_rq *rrqd = container_of(ref, struct rrpc_rq, refs);
> +     struct rrpc *rrpc = rrqd->addr->rblk->rlun->rrpc;

Wow.. Happy that we got to rrpc :)

> +
> +     rrpc_unlock_rq(rrpc, rrqd);
> +     mempool_free(rrqd, rrpc->rrq_pool);
> +}
> +
> +static struct rrpc_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
>                                       sector_t laddr, unsigned int pages)
>  {
> -     struct nvm_rq *rqd;
> +     struct rrpc_rq *rrqd;
>       struct rrpc_inflight_rq *inf;
>  
> -     rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
> -     if (!rqd)
> +     rrqd = mempool_alloc(rrpc->rrq_pool, GFP_ATOMIC);
> +     if (!rrqd)
>               return ERR_PTR(-ENOMEM);
> +     kref_init(&rrqd->refs);
>  
> -     inf = rrpc_get_inflight_rq(rqd);
> +     inf = rrpc_get_inflight_rq(rrqd);
>       if (rrpc_lock_laddr(rrpc, laddr, pages, inf)) {
> -             mempool_free(rqd, rrpc->rq_pool);
> +             mempool_free(rrqd, rrpc->rrq_pool);
>               return NULL;
>       }
>  
> -     return rqd;
> +     return rrqd;
>  }
>  
> -static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct nvm_rq 
> *rqd)
> +static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct rrpc_rq 
> *rrqd)
>  {
> -     struct rrpc_inflight_rq *inf = rrpc_get_inflight_rq(rqd);
> -
> -     rrpc_unlock_laddr(rrpc, inf);
> -
> -     mempool_free(rqd, rrpc->rq_pool);
> +     kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
>  }
>  
>  static void rrpc_discard(struct rrpc *rrpc, struct bio *bio)
>  {
>       sector_t slba = bio->bi_iter.bi_sector / NR_PHY_IN_LOG;
>       sector_t len = bio->bi_iter.bi_size / RRPC_EXPOSED_PAGE_SIZE;
> -     struct nvm_rq *rqd;
> +     struct rrpc_rq *rrqd;
>  
>       do {
> -             rqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
> +             rrqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
>               schedule();
> -     } while (!rqd);
> +     } while (!rrqd);
>  
> -     if (IS_ERR(rqd)) {
> +     if (IS_ERR(rrqd)) {
>               pr_err("rrpc: unable to acquire inflight IO\n");
>               bio_io_error(bio);
>               return;
>       }
>  
>       rrpc_invalidate_range(rrpc, slba, len);
> -     rrpc_inflight_laddr_release(rrpc, rqd);
> +     rrpc_inflight_laddr_release(rrpc, rrqd);
>  }
>  
>  static int block_is_full(struct rrpc *rrpc, struct rrpc_block *rblk)
> @@ -166,8 +173,6 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, 
> struct rrpc_block *rblk)
>  {
>       struct rrpc *rrpc = rlun->rrpc;
>  
> -     BUG_ON(!rblk);
> -
>       if (rlun->cur) {
>               spin_lock(&rlun->cur->lock);
>               WARN_ON(!block_is_full(rrpc, rlun->cur));
> @@ -176,12 +181,107 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, 
> struct rrpc_block *rblk)
>       rlun->cur = rblk;
>  }
>  
> +static void rrpc_free_w_buffer(struct rrpc *rrpc, struct rrpc_block *rblk)
> +{
> +try:
> +     spin_lock(&rblk->w_buf.s_lock);
> +     if (atomic_read(&rblk->w_buf.refs) > 0) {
> +             spin_unlock(&rblk->w_buf.s_lock);
> +             schedule();
> +             goto try;
> +     }
> +
> +     mempool_free(rblk->w_buf.entries, rrpc->block_pool);
> +     rblk->w_buf.entries = NULL;
> +     spin_unlock(&rblk->w_buf.s_lock);
> +
> +     /* TODO: Reuse the same buffers if the block size is the same */
> +     mempool_free(rblk->w_buf.data, rrpc->write_buf_pool);
> +     kfree(rblk->w_buf.sync_bitmap);
> +
> +     rblk->w_buf.mem = NULL;
> +     rblk->w_buf.subm = NULL;
> +     rblk->w_buf.sync_bitmap = NULL;
> +     rblk->w_buf.data = NULL;
> +     rblk->w_buf.nentries = 0;
> +     rblk->w_buf.cur_mem = 0;
> +     rblk->w_buf.cur_subm = 0;
> +}
> +
> +static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
> +{
> +     struct rrpc_lun *rlun = rblk->rlun;
> +     struct nvm_lun *lun = rlun->parent;
> +     struct rrpc_w_buf *buf = &rblk->w_buf;
> +
> +try:
> +     spin_lock(&buf->w_lock);
> +     /* Flush inflight I/Os */
> +     if (!bitmap_full(buf->sync_bitmap, buf->cur_mem)) {
> +             spin_unlock(&buf->w_lock);
> +             schedule();
> +             goto try;
> +     }
> +     spin_unlock(&buf->w_lock);
> +
> +     if (rblk->w_buf.entries)
> +             rrpc_free_w_buffer(rrpc, rblk);
> +
> +     spin_lock(&lun->lock);
> +     nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
> +     list_del(&rblk->list);
> +     spin_unlock(&lun->lock);
> +}
> +
> +static void rrpc_put_blks(struct rrpc *rrpc)
> +{
> +     struct rrpc_lun *rlun;
> +     int i;
> +
> +     for (i = 0; i < rrpc->nr_luns; i++) {
> +             rlun = &rrpc->luns[i];
> +             if (rlun->cur)
> +                     rrpc_put_blk(rrpc, rlun->cur);
> +             if (rlun->gc_cur)
> +                     rrpc_put_blk(rrpc, rlun->gc_cur);
> +     }
> +}
> +
>  static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun 
> *rlun,
>                                                       unsigned long flags)
>  {
> +     struct nvm_dev *dev = rrpc->dev;
>       struct nvm_lun *lun = rlun->parent;
>       struct nvm_block *blk;
>       struct rrpc_block *rblk;
> +     struct buf_entry *entries;
> +     unsigned long *sync_bitmap;
> +     void *data;
> +     int nentries = dev->pgs_per_blk * dev->sec_per_pg;
> +
> +     data = mempool_alloc(rrpc->write_buf_pool, GFP_ATOMIC);
> +     if (!data) {
> +             pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
> +             return NULL;
> +     }
> +
> +     entries = mempool_alloc(rrpc->block_pool, GFP_ATOMIC);
> +     if (!entries) {
> +             pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
> +             mempool_free(data, rrpc->write_buf_pool);
> +             return NULL;
> +     }
> +
> +     /* TODO: Mempool? */

Not on fast path. I don't think we need mempools at all to take some
unnecessary memory.

> +     sync_bitmap = kmalloc(BITS_TO_LONGS(nentries) *
> +                                     sizeof(unsigned long), GFP_ATOMIC);
> +     if (!sync_bitmap) {
> +             mempool_free(data, rrpc->write_buf_pool);
> +             mempool_free(entries, rrpc->block_pool);
> +             return NULL;
> +     }
> +
> +     bitmap_zero(sync_bitmap, nentries);
>  
>       spin_lock(&lun->lock);
>       blk = nvm_get_blk_unlocked(rrpc->dev, rlun->parent, flags);
> @@ -192,43 +292,34 @@ static struct rrpc_block *rrpc_get_blk(struct rrpc 
> *rrpc, struct rrpc_lun *rlun,
>       }
>  
>       rblk = &rlun->blocks[blk->id];
> -     list_add_tail(&rblk->list, &rlun->open_list);
> -     spin_unlock(&lun->lock);
>  
>       blk->priv = rblk;
> -     bitmap_zero(rblk->invalid_pages, rrpc->dev->pgs_per_blk);
> +     bitmap_zero(rblk->invalid_pages, dev->pgs_per_blk);
>       rblk->next_page = 0;
>       rblk->nr_invalid_pages = 0;
> -     atomic_set(&rblk->data_cmnt_size, 0);
> +
> +     rblk->w_buf.data = data;
> +     rblk->w_buf.entries = entries;
> +     rblk->w_buf.sync_bitmap = sync_bitmap;
> +
> +     rblk->w_buf.entries->data  = rblk->w_buf.data;
> +     rblk->w_buf.mem = rblk->w_buf.entries;
> +     rblk->w_buf.subm = rblk->w_buf.entries;
> +     rblk->w_buf.nentries = nentries;
> +     rblk->w_buf.cur_mem = 0;
> +     rblk->w_buf.cur_subm = 0;
> +
> +     atomic_set(&rblk->w_buf.refs, 0);
> +
> +     spin_lock_init(&rblk->w_buf.w_lock);
> +     spin_lock_init(&rblk->w_buf.s_lock);
> +
> +     list_add_tail(&rblk->list, &rlun->open_list);
> +     spin_unlock(&lun->lock);
>  
>       return rblk;
>  }
>  
> -static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
> -{
> -     struct rrpc_lun *rlun = rblk->rlun;
> -     struct nvm_lun *lun = rlun->parent;
> -
> -     spin_lock(&lun->lock);
> -     nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
> -     list_del(&rblk->list);
> -     spin_unlock(&lun->lock);
> -}
> -
> -static void rrpc_put_blks(struct rrpc *rrpc)
> -{
> -     struct rrpc_lun *rlun;
> -     int i;
> -
> -     for (i = 0; i < rrpc->nr_luns; i++) {
> -             rlun = &rrpc->luns[i];
> -             if (rlun->cur)
> -                     rrpc_put_blk(rrpc, rlun->cur);
> -             if (rlun->gc_cur)
> -                     rrpc_put_blk(rrpc, rlun->gc_cur);
> -     }
> -}
> -
>  static struct rrpc_lun *get_next_lun(struct rrpc *rrpc)
>  {
>       int next = atomic_inc_return(&rrpc->next_lun);
> @@ -247,6 +338,17 @@ static void rrpc_gc_kick(struct rrpc *rrpc)
>       }
>  }
>  
> +static void rrpc_writer_kick(struct rrpc *rrpc)
> +{
> +     struct rrpc_lun *rlun;
> +     unsigned int i;
> +
> +     for (i = 0; i < rrpc->nr_luns; i++) {
> +             rlun = &rrpc->luns[i];
> +             queue_work(rrpc->kw_wq, &rlun->ws_writer);
> +     }
> +}
> +
>  /*
>   * timed GC every interval.
>   */
> @@ -282,7 +384,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, 
> struct rrpc_block *rblk)
>  {
>       struct request_queue *q = rrpc->dev->q;
>       struct rrpc_rev_addr *rev;
> -     struct nvm_rq *rqd;
> +     struct rrpc_rq *rrqd;
>       struct bio *bio;
>       struct page *page;
>       int slot;
> @@ -306,7 +408,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, 
> struct rrpc_block *rblk)
>       }
>  
>       while ((slot = find_first_zero_bit(rblk->invalid_pages,
> -                                         nr_pgs_per_blk)) < nr_pgs_per_blk) {
> +                                     nr_pgs_per_blk)) < nr_pgs_per_blk) {

No need to fix the indentation here.

>  
>               /* Lock laddr */
>               phys_addr = (rblk->parent->id * nr_pgs_per_blk) + slot;
> @@ -321,8 +423,8 @@ try:
>                       continue;
>               }
>  
> -             rqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
> -             if (IS_ERR_OR_NULL(rqd)) {
> +             rrqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
> +             if (IS_ERR_OR_NULL(rrqd)) {
>                       spin_unlock(&rrpc->rev_lock);
>                       schedule();
>                       goto try;
> @@ -339,14 +441,14 @@ try:
>               /* TODO: may fail when EXP_PG_SIZE > PAGE_SIZE */
>               bio_add_pc_page(q, bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
>  
> -             if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
> +             if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)) {
>                       pr_err("rrpc: gc read failed.\n");
> -                     rrpc_inflight_laddr_release(rrpc, rqd);
> +                     rrpc_inflight_laddr_release(rrpc, rrqd);
>                       goto finished;
>               }
>               wait_for_completion_io(&wait);
>               if (bio->bi_error) {
> -                     rrpc_inflight_laddr_release(rrpc, rqd);
> +                     rrpc_inflight_laddr_release(rrpc, rrqd);
>                       goto finished;
>               }
>  
> @@ -363,14 +465,16 @@ try:
>               /* turn the command around and write the data back to a new
>                * address
>                */
> -             if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
> +             if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)
> +                                                     != NVM_IO_DONE) {
>                       pr_err("rrpc: gc write failed.\n");
> -                     rrpc_inflight_laddr_release(rrpc, rqd);
> +                     rrpc_inflight_laddr_release(rrpc, rrqd);
>                       goto finished;
>               }
> +             bio_endio(bio);
>               wait_for_completion_io(&wait);
>  
> -             rrpc_inflight_laddr_release(rrpc, rqd);
> +             /* rrpc_inflight_laddr_release(rrpc, rrqd); */

This is commented out.

>               if (bio->bi_error)
>                       goto finished;
>  
> @@ -514,6 +618,8 @@ static void rrpc_gc_queue(struct work_struct *work)
>       list_move_tail(&rblk->list, &rlun->closed_list);
>       spin_unlock(&lun->lock);
>  
> +     rrpc_free_w_buffer(rrpc, rblk);
> +
>       mempool_free(gcb, rrpc->gcb_pool);
>       pr_debug("nvm: block '%lu' is full, allow GC (sched)\n",
>                                                       rblk->parent->id);
> @@ -663,43 +769,72 @@ static void rrpc_run_gc(struct rrpc *rrpc, struct 
> rrpc_block *rblk)
>       queue_work(rrpc->kgc_wq, &gcb->ws_gc);
>  }
>  
> -static void rrpc_end_io_write(struct rrpc *rrpc, struct rrpc_rq *rrqd,
> -                                             sector_t laddr, uint8_t npages)
> +static void rrpc_sync_buffer(struct rrpc *rrpc, struct rrpc_addr *p)
>  {
> -     struct rrpc_addr *p;
>       struct rrpc_block *rblk;
> +     struct rrpc_w_buf *buf;
>       struct nvm_lun *lun;
> -     int cmnt_size, i;
> +     unsigned long bppa;
>  
> -     for (i = 0; i < npages; i++) {
> -             p = &rrpc->trans_map[laddr + i];
> -             rblk = p->rblk;
> -             lun = rblk->parent->lun;
> +     rblk = p->rblk;
> +     buf = &rblk->w_buf;
> +     lun = rblk->parent->lun;
>  
> -             cmnt_size = atomic_inc_return(&rblk->data_cmnt_size);
> -             if (unlikely(cmnt_size == rrpc->dev->pgs_per_blk))
> -                     rrpc_run_gc(rrpc, rblk);
> +     bppa = rrpc->dev->sec_per_blk * rblk->parent->id;
> +
> +     WARN_ON(test_and_set_bit((p->addr - bppa), buf->sync_bitmap));
> +
> +     if (unlikely(bitmap_full(buf->sync_bitmap, buf->nentries))) {
> +             /* Write buffer out-of-bounds */
> +             WARN_ON((buf->cur_mem != buf->nentries) &&
> +                                     (buf->cur_mem != buf->cur_subm));
> +
> +             rrpc_run_gc(rrpc, rblk);
> +     }
> +}
> +
> +static void rrpc_end_io_write(struct rrpc *rrpc, struct nvm_rq *rqd,
> +                                                     uint8_t nr_pages)
> +{
> +     struct rrpc_buf_rq *brrqd = nvm_rq_to_pdu(rqd);
> +     struct rrpc_rq *rrqd;
> +     int i;
> +
> +     for (i = 0; i < nr_pages; i++) {
> +             rrqd = brrqd[i].rrqd;
> +             rrpc_sync_buffer(rrpc, brrqd[i].addr);
> +             kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
>       }
> +
> +     mempool_free(brrqd, rrpc->m_rrq_pool);
> +     rrpc_writer_kick(rrpc);
> +}
> +
> +static void rrpc_end_io_read(struct rrpc *rrpc, struct nvm_rq *rqd,
> +                                                     uint8_t nr_pages)
> +{
> +     struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> +
> +     if (rqd->flags & NVM_IOTYPE_GC)
> +             return;
> +
> +     rrpc_unlock_rq(rrpc, rrqd);
> +     mempool_free(rrqd, rrpc->rrq_pool);
>  }
>  
>  static void rrpc_end_io(struct nvm_rq *rqd)
>  {
>       struct rrpc *rrpc = container_of(rqd->ins, struct rrpc, instance);
> -     struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> -     uint8_t npages = rqd->nr_pages;
> -     sector_t laddr = rrpc_get_laddr(rqd->bio) - npages;
> +     uint8_t nr_pages = rqd->nr_pages;
>  
>       if (bio_data_dir(rqd->bio) == WRITE)
> -             rrpc_end_io_write(rrpc, rrqd, laddr, npages);
> +             rrpc_end_io_write(rrpc, rqd, nr_pages);
> +     else
> +             rrpc_end_io_read(rrpc, rqd, nr_pages);
>  
>       bio_put(rqd->bio);
>  
> -     if (rrqd->flags & NVM_IOTYPE_GC)
> -             return;
> -
> -     rrpc_unlock_rq(rrpc, rqd);
> -
> -     if (npages > 1)
> +     if (nr_pages > 1)

Cleanup patches can go in afterwards or before.

>               nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
>       if (rqd->metadata)
>               nvm_dev_dma_free(rrpc->dev, rqd->metadata, rqd->dma_metadata);
> @@ -708,20 +843,24 @@ static void rrpc_end_io(struct nvm_rq *rqd)
>  }
>  
>  static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
> -                     struct nvm_rq *rqd, unsigned long flags, int npages)
> +                     struct nvm_rq *rqd, struct rrpc_buf_rq *brrqd,
> +                     unsigned long flags, int nr_pages)
>  {
> -     struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> +     struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> +     struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>       struct rrpc_addr *gp;
>       sector_t laddr = rrpc_get_laddr(bio);
>       int is_gc = flags & NVM_IOTYPE_GC;
>       int i;
>  
> -     if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
> +     if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
>               nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> +             mempool_free(rrqd, rrpc->rrq_pool);
> +             mempool_free(rqd, rrpc->rq_pool);

Seems like there is a relationship here between rrq_pool (rrpc_rq) and
rq_pool (nvm_rq)

To get to a single mempool alloc ( and just keep the nvm_rq mempool )



>               return NVM_IO_REQUEUE;
>       }
>  
> -     for (i = 0; i < npages; i++) {
> +     for (i = 0; i < nr_pages; i++) {
>               /* We assume that mapping occurs at 4KB granularity */
>               BUG_ON(!(laddr + i >= 0 && laddr + i < rrpc->nr_sects));
>               gp = &rrpc->trans_map[laddr + i];
> @@ -734,8 +873,12 @@ static int rrpc_read_ppalist_rq(struct rrpc *rrpc, 
> struct bio *bio,
>                       rrpc_unlock_laddr(rrpc, r);
>                       nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
>                                                       rqd->dma_ppa_list);
> +                     mempool_free(rrqd, rrpc->rrq_pool);
> +                     mempool_free(rqd, rrpc->rq_pool);
>                       return NVM_IO_DONE;
>               }
> +
> +             brrqd[i].addr = gp;
>       }
>  
>       rqd->opcode = NVM_OP_HBREAD;
> @@ -751,8 +894,11 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio 
> *bio, struct nvm_rq *rqd,
>       sector_t laddr = rrpc_get_laddr(bio);
>       struct rrpc_addr *gp;
>  
> -     if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
> +     if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> +             mempool_free(rrqd, rrpc->rrq_pool);
> +             mempool_free(rqd, rrpc->rq_pool);
>               return NVM_IO_REQUEUE;
> +     }
>  
>       BUG_ON(!(laddr >= 0 && laddr < rrpc->nr_sects));
>       gp = &rrpc->trans_map[laddr];
> @@ -761,7 +907,9 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio 
> *bio, struct nvm_rq *rqd,
>               rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, gp->addr);
>       } else {
>               BUG_ON(is_gc);
> -             rrpc_unlock_rq(rrpc, rqd);
> +             rrpc_unlock_rq(rrpc, rrqd);
> +             mempool_free(rrqd, rrpc->rrq_pool);
> +             mempool_free(rqd, rrpc->rq_pool);
>               return NVM_IO_DONE;
>       }
>  
> @@ -771,120 +919,190 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio 
> *bio, struct nvm_rq *rqd,
>       return NVM_IO_OK;
>  }
>  
> +/*
> + * Copy data from current bio to block write buffer. This if necessary
> + * to guarantee durability if a flash block becomes bad before all pages
> + * are written. This buffer is also used to write at the right page
> + * granurality
> + */
> +static int rrpc_write_to_buffer(struct rrpc *rrpc, struct bio *bio,
> +                             struct rrpc_rq *rrqd, struct rrpc_addr *addr,
> +                             struct rrpc_w_buf *w_buf,
> +                             unsigned long flags)
> +{
> +     struct nvm_dev *dev = rrpc->dev;
> +     unsigned int bio_len = bio_cur_bytes(bio);
> +
> +     if (bio_len != RRPC_EXPOSED_PAGE_SIZE)
> +             return NVM_IO_ERR;
> +
> +     spin_lock(&w_buf->w_lock);
> +
> +     WARN_ON(w_buf->cur_mem == w_buf->nentries);
> +
> +     w_buf->mem->rrqd = rrqd;
> +     w_buf->mem->addr = addr;
> +     w_buf->mem->flags = flags;
> +
> +     memcpy(w_buf->mem->data, bio_data(bio), bio_len);
> +
> +     w_buf->cur_mem++;
> +     if (likely(w_buf->cur_mem < w_buf->nentries)) {
> +             w_buf->mem++;
> +             w_buf->mem->data =
> +                             w_buf->data + (w_buf->cur_mem * dev->sec_size);
> +     }
> +
> +     spin_unlock(&w_buf->w_lock);
> +
> +     return 0;
> +}
> +
>  static int rrpc_write_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
> -                     struct nvm_rq *rqd, unsigned long flags, int npages)
> +                     struct rrpc_rq *rrqd, unsigned long flags, int nr_pages)
>  {
> -     struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> +     struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
> +     struct rrpc_w_buf *w_buf;
>       struct rrpc_addr *p;
> +     struct rrpc_lun *rlun;
>       sector_t laddr = rrpc_get_laddr(bio);
>       int is_gc = flags & NVM_IOTYPE_GC;
> +     int err;
>       int i;
>  
> -     if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
> -             nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> +     if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> +             mempool_free(rrqd, rrpc->rrq_pool);
>               return NVM_IO_REQUEUE;
>       }
>  
> -     for (i = 0; i < npages; i++) {
> +     for (i = 0; i < nr_pages; i++) {
> +             kref_get(&rrqd->refs);
> +
>               /* We assume that mapping occurs at 4KB granularity */
>               p = rrpc_map_page(rrpc, laddr + i, is_gc);
>               if (!p) {
>                       BUG_ON(is_gc);
>                       rrpc_unlock_laddr(rrpc, r);
> -                     nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
> -                                                     rqd->dma_ppa_list);
> +                     mempool_free(rrqd, rrpc->rrq_pool);
>                       rrpc_gc_kick(rrpc);
>                       return NVM_IO_REQUEUE;
>               }
>  
> -             rqd->ppa_list[i] = rrpc_ppa_to_gaddr(rrpc->dev,
> -                                                             p->addr);
> +             w_buf = &p->rblk->w_buf;
> +             rlun = p->rblk->rlun;
> +
> +             rrqd->addr = p;
> +
> +             err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
> +             if (err) {
> +                     pr_err("rrpc: could not write to write buffer\n");
> +                     return err;
> +             }
> +
> +             bio_advance(bio, RRPC_EXPOSED_PAGE_SIZE);
> +
> +             queue_work(rrpc->kw_wq, &rlun->ws_writer);

How about a timer and only the kick when a flush command is sent?

>       }
>  
> -     rqd->opcode = NVM_OP_HBWRITE;
> +     if (kref_put(&rrqd->refs, rrpc_release_and_free_rrqd)) {
> +             pr_err("rrpc: request reference counter dailed\n");
> +             return NVM_IO_ERR;
> +     }
>  
> -     return NVM_IO_OK;
> +     return NVM_IO_DONE;
>  }
>  
>  static int rrpc_write_rq(struct rrpc *rrpc, struct bio *bio,
> -                             struct nvm_rq *rqd, unsigned long flags)
> +                             struct rrpc_rq *rrqd, unsigned long flags)
>  {
> -     struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> +     struct rrpc_w_buf *w_buf;
>       struct rrpc_addr *p;
> +     struct rrpc_lun *rlun;
>       int is_gc = flags & NVM_IOTYPE_GC;
> +     int err;
>       sector_t laddr = rrpc_get_laddr(bio);
>  
> -     if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
> +     if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> +             mempool_free(rrqd, rrpc->rrq_pool);
>               return NVM_IO_REQUEUE;
> +     }
>  
>       p = rrpc_map_page(rrpc, laddr, is_gc);
>       if (!p) {
>               BUG_ON(is_gc);
> -             rrpc_unlock_rq(rrpc, rqd);
> +             rrpc_unlock_rq(rrpc, rrqd);
> +             mempool_free(rrqd, rrpc->rrq_pool);
>               rrpc_gc_kick(rrpc);
>               return NVM_IO_REQUEUE;
>       }
>  
> -     rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, p->addr);
> -     rqd->opcode = NVM_OP_HBWRITE;
> +     w_buf = &p->rblk->w_buf;
> +     rlun = p->rblk->rlun;
> +
>       rrqd->addr = p;
>  
> -     return NVM_IO_OK;
> +     err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
> +     if (err) {
> +             pr_err("rrpc: could not write to write buffer\n");
> +             return err;
> +     }
> +
> +     queue_work(rrpc->kw_wq, &rlun->ws_writer);
> +     return NVM_IO_DONE;
>  }
>  
> -static int rrpc_setup_rq(struct rrpc *rrpc, struct bio *bio,
> -                     struct nvm_rq *rqd, unsigned long flags, uint8_t npages)
> +static int rrpc_submit_read(struct rrpc *rrpc, struct bio *bio,
> +                             struct rrpc_rq *rrqd, unsigned long flags)
>  {
> -     if (npages > 1) {
> +     struct nvm_rq *rqd;
> +     struct rrpc_buf_rq brrqd[rrpc->max_write_pgs];
> +     uint8_t nr_pages = rrpc_get_pages(bio);
> +     int err;
> +
> +     rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
> +     if (!rqd) {
> +             pr_err_ratelimited("rrpc: not able to queue bio.");
> +             bio_io_error(bio);
> +             return NVM_IO_ERR;
> +     }
> +     rqd->metadata = NULL;
> +     rqd->priv = rrqd;
> +
> +     if (nr_pages > 1) {
>               rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_KERNEL,
> -                                                     &rqd->dma_ppa_list);
> +                                             &rqd->dma_ppa_list);
>               if (!rqd->ppa_list) {
>                       pr_err("rrpc: not able to allocate ppa list\n");
> +                     mempool_free(rrqd, rrpc->rrq_pool);
> +                     mempool_free(rqd, rrpc->rq_pool);
>                       return NVM_IO_ERR;
>               }
>  
> -             if (bio_rw(bio) == WRITE)
> -                     return rrpc_write_ppalist_rq(rrpc, bio, rqd, flags,
> -                                                                     npages);
> -
> -             return rrpc_read_ppalist_rq(rrpc, bio, rqd, flags, npages);
> +             err = rrpc_read_ppalist_rq(rrpc, bio, rqd, brrqd, flags,
> +                                                             nr_pages);
> +             if (err) {
> +                     mempool_free(rrqd, rrpc->rrq_pool);
> +                     mempool_free(rqd, rrpc->rq_pool);
> +                     return err;
> +             }
> +     } else {
> +             err = rrpc_read_rq(rrpc, bio, rqd, flags);
> +             if (err)
> +                     return err;
>       }
>  
> -     if (bio_rw(bio) == WRITE)
> -             return rrpc_write_rq(rrpc, bio, rqd, flags);
> -
> -     return rrpc_read_rq(rrpc, bio, rqd, flags);
> -}
> -
> -static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> -                             struct nvm_rq *rqd, unsigned long flags)
> -{
> -     int err;
> -     struct rrpc_rq *rrq = nvm_rq_to_pdu(rqd);
> -     uint8_t nr_pages = rrpc_get_pages(bio);
> -     int bio_size = bio_sectors(bio) << 9;
> -
> -     if (bio_size < rrpc->dev->sec_size)
> -             return NVM_IO_ERR;
> -     else if (bio_size > rrpc->dev->max_rq_size)
> -             return NVM_IO_ERR;
> -
> -     err = rrpc_setup_rq(rrpc, bio, rqd, flags, nr_pages);
> -     if (err)
> -             return err;
> -
>       bio_get(bio);
>       rqd->bio = bio;
>       rqd->ins = &rrpc->instance;
> -     rqd->nr_pages = nr_pages;
> -     rrq->flags = flags;
> +     rqd->nr_pages = rrqd->nr_pages = nr_pages;
> +     rqd->flags = flags;
>  
>       err = nvm_submit_io(rrpc->dev, rqd);
>       if (err) {
>               pr_err("rrpc: I/O submission failed: %d\n", err);
>               bio_put(bio);
>               if (!(flags & NVM_IOTYPE_GC)) {
> -                     rrpc_unlock_rq(rrpc, rqd);
> +                     rrpc_unlock_rq(rrpc, rrqd);
>                       if (rqd->nr_pages > 1)
>                               nvm_dev_dma_free(rrpc->dev,
>                       rqd->ppa_list, rqd->dma_ppa_list);
> @@ -895,10 +1113,39 @@ static int rrpc_submit_io(struct rrpc *rrpc, struct 
> bio *bio,
>       return NVM_IO_OK;
>  }
>  
> +static int rrpc_buffer_write(struct rrpc *rrpc, struct bio *bio,
> +                             struct rrpc_rq *rrqd, unsigned long flags)
> +{
> +     uint8_t nr_pages = rrpc_get_pages(bio);
> +
> +     rrqd->nr_pages = nr_pages;
> +
> +     if (nr_pages > 1)
> +             return rrpc_write_ppalist_rq(rrpc, bio, rrqd, flags, nr_pages);
> +     else

You can skip the else here.

> +             return rrpc_write_rq(rrpc, bio, rrqd, flags);
> +}
> +
> +static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> +                             struct rrpc_rq *rrqd, unsigned long flags)
> +{
> +     int bio_size = bio_sectors(bio) << 9;
> +
> +     if (bio_size < rrpc->dev->sec_size)
> +             return NVM_IO_ERR;
> +     else if (bio_size > rrpc->dev->max_rq_size)
> +             return NVM_IO_ERR;

I have a patch incoming that removes this. Properly making
rrpc_submit_io obsolete.

> +
> +     if (bio_rw(bio) == READ)
> +             return rrpc_submit_read(rrpc, bio, rrqd, flags);
> +
> +     return rrpc_buffer_write(rrpc, bio, rrqd, flags);
> +}
> +
>  static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
>  {
>       struct rrpc *rrpc = q->queuedata;
> -     struct nvm_rq *rqd;
> +     struct rrpc_rq *rrqd;
>       int err;
>  
>       if (bio->bi_rw & REQ_DISCARD) {
> @@ -906,15 +1153,15 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, 
> struct bio *bio)
>               return BLK_QC_T_NONE;
>       }
>  
> -     rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
> -     if (!rqd) {
> -             pr_err_ratelimited("rrpc: not able to queue bio.");
> +     rrqd = mempool_alloc(rrpc->rrq_pool, GFP_KERNEL);
> +     if (!rrqd) {
> +             pr_err_ratelimited("rrpc: not able to allocate rrqd.");
>               bio_io_error(bio);
>               return BLK_QC_T_NONE;
>       }
> -     memset(rqd, 0, sizeof(struct nvm_rq));

It seems like we don't use nvm_rq in the write patch? (the writer thread
will do the write itself). Move this inside read? and use the original
nvm_rq for submission?

> +     kref_init(&rrqd->refs);
>  
> -     err = rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_NONE);
> +     err = rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_NONE);
>       switch (err) {
>       case NVM_IO_OK:
>               return BLK_QC_T_NONE;
> @@ -932,10 +1179,221 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, 
> struct bio *bio)
>               break;
>       }
>  
> -     mempool_free(rqd, rrpc->rq_pool);
>       return BLK_QC_T_NONE;
>  }
>  
> +static int rrpc_alloc_page_in_bio(struct rrpc *rrpc, struct bio *bio,
> +                                                             void *data)
> +{
> +     struct page *page;
> +     int err;
> +
> +     if (PAGE_SIZE != RRPC_EXPOSED_PAGE_SIZE)
> +             return -1;

return -EINVAL?

This doesn't work on power and sparch architectures (with larger
PAGE_SIZEs). Would be great to handle that as well.

> +
> +     page = virt_to_page(data);
> +     if (!page) {
> +             pr_err("nvm: rrpc: could not alloc page\n");
> +             return -1;
> +     }
> +
> +     err = bio_add_page(bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
> +     if (err != RRPC_EXPOSED_PAGE_SIZE) {
> +             pr_err("nvm: rrpc: could not add page to bio\n");
> +             return -1;
> +     }
> +
> +     return 0;
> +}
> +
> +static void rrpc_submit_write(struct work_struct *work)
> +{
> +     struct rrpc_lun *rlun = container_of(work, struct rrpc_lun, ws_writer);
> +     struct rrpc *rrpc = rlun->rrpc;
> +     struct nvm_dev *dev = rrpc->dev;
> +     struct rrpc_addr *addr;
> +     struct rrpc_rq *rrqd;
> +     struct rrpc_buf_rq *brrqd;
> +     void *data;
> +     struct nvm_rq *rqd;
> +     struct rrpc_block *rblk;
> +     struct bio *bio;
> +     int pgs_to_sync, pgs_avail;
> +     int sync = NVM_SYNC_HARD;
> +     int err;
> +     int i;
> +
> +     /* Note that OS pages are typically mapped to flash page sectors, which
> +      * are 4K; a flash page might be formed of several sectors. Also,
> +      * controllers typically program flash pages across multiple planes.
> +      * This is the flash programing granurality, and the reason behind the
> +      * sync strategy performed in this write thread.
> +      */
> +try:
> +     spin_lock(&rlun->parent->lock);
> +     list_for_each_entry(rblk, &rlun->open_list, list) {
> +             if (!spin_trylock(&rblk->w_buf.w_lock))
> +                     continue;
> +
> +             /* If the write thread has already submitted all I/Os in the
> +              * write buffer for this block ignore that the block is in the
> +              * open list; it is on its way to the closed list. This enables
> +              * us to avoid taking a lock on the list.
> +              */
> +             if (unlikely(rblk->w_buf.cur_subm == rblk->w_buf.nentries)) {
> +                     spin_unlock(&rblk->w_buf.w_lock);
> +                     spin_unlock(&rlun->parent->lock);
> +                     schedule();
> +                     goto try;
> +             }
> +             pgs_avail = rblk->w_buf.cur_mem - rblk->w_buf.cur_subm;
> +
> +             switch (sync) {
> +             case NVM_SYNC_SOFT:
> +                     pgs_to_sync = (pgs_avail >= rrpc->max_write_pgs) ?
> +                                     rrpc->max_write_pgs : 0;
> +                     break;
> +             case NVM_SYNC_HARD:
> +                     if (pgs_avail >= rrpc->max_write_pgs)
> +                             pgs_to_sync = rrpc->max_write_pgs;
> +                     else if (pgs_avail >= rrpc->min_write_pgs)
> +                             pgs_to_sync = rrpc->min_write_pgs *
> +                                     (pgs_avail / rrpc->min_write_pgs);
> +                     else
> +                             pgs_to_sync = pgs_avail; /* TODO: ADD PADDING */
> +                     break;
> +             case NVM_SYNC_OPORT:
> +                     if (pgs_avail >= rrpc->max_write_pgs)
> +                             pgs_to_sync = rrpc->max_write_pgs;
> +                     else if (pgs_avail >= rrpc->min_write_pgs)
> +                             pgs_to_sync = rrpc->min_write_pgs *
> +                                     (pgs_avail / rrpc->min_write_pgs);
> +                     else
> +                             pgs_to_sync = 0;
> +             }
> +
> +             if (pgs_to_sync == 0) {
> +                     spin_unlock(&rblk->w_buf.w_lock);
> +                     continue;
> +             }
> +
> +             bio = bio_alloc(GFP_ATOMIC, pgs_to_sync);
> +             if (!bio) {
> +                     pr_err("nvm: rrpc: could not alloc write bio\n");
> +                     goto out1;
> +             }
> +
> +             rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
> +             if (!rqd) {
> +                     pr_err_ratelimited("rrpc: not able to create w req.");
> +                     goto out2;
> +             }
> +             rqd->metadata = NULL;
> +
> +             brrqd = mempool_alloc(rrpc->m_rrq_pool, GFP_ATOMIC);
> +             if (!brrqd) {
> +                     pr_err_ratelimited("rrpc: not able to create w rea.");
> +                     goto out3;
> +             }
> +
> +             bio->bi_iter.bi_sector = 0; /* artificial bio */
> +             bio->bi_rw = WRITE;
> +
> +             rqd->opcode = NVM_OP_HBWRITE;
> +             rqd->bio = bio;
> +             rqd->ins = &rrpc->instance;
> +             rqd->nr_pages = pgs_to_sync;
> +             rqd->priv = brrqd;
> +
> +             if (pgs_to_sync == 1) {
> +                     rrqd = rblk->w_buf.subm->rrqd;
> +                     addr = rblk->w_buf.subm->addr;
> +                     rqd->flags = rblk->w_buf.subm->flags;
> +                     data = rblk->w_buf.subm->data;
> +
> +                     err = rrpc_alloc_page_in_bio(rrpc, bio, data);
> +                     if (err) {
> +                             pr_err("rrpc: cannot allocate page in bio\n");
> +                             goto out4;
> +                     }
> +
> +                     /* TODO: This address can be skipped */
> +                     if (addr->addr == ADDR_EMPTY)
> +                             pr_err_ratelimited("rrpc: submitting empty rq");
> +
> +                     rqd->ppa_addr = rrpc_ppa_to_gaddr(dev, addr->addr);
> +
> +                     brrqd[0].rrqd = rrqd;
> +                     brrqd[0].addr = addr;
> +
> +                     rblk->w_buf.subm++;
> +                     rblk->w_buf.cur_subm++;
> +
> +                     goto submit_io;
> +             }
> +
> +             /* This bio will contain several pppas */
> +             rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_ATOMIC,
> +                                                     &rqd->dma_ppa_list);
> +             if (!rqd->ppa_list) {
> +                     pr_err("rrpc: not able to allocate ppa list\n");
> +                     goto out4;
> +             }
> +
> +             for (i = 0; i < pgs_to_sync; i++) {
> +                     rrqd = rblk->w_buf.subm->rrqd;
> +                     addr = rblk->w_buf.subm->addr;
> +                     rqd->flags = rblk->w_buf.subm->flags;
> +                     data = rblk->w_buf.subm->data;
> +
> +                     err = rrpc_alloc_page_in_bio(rrpc, bio, data);
> +                     if (err) {
> +                             pr_err("rrpc: cannot allocate page in bio\n");
> +                             goto out5;
> +                     }
> +
> +                     /* TODO: This address can be skipped */
> +                     if (addr->addr == ADDR_EMPTY)
> +                             pr_err_ratelimited("rrpc: submitting empty rq");
> +
> +                     rqd->ppa_list[i] = rrpc_ppa_to_gaddr(dev, addr->addr);
> +
> +                     brrqd[i].rrqd = rrqd;
> +                     brrqd[i].addr = addr;
> +
> +                     rblk->w_buf.subm++;
> +                     rblk->w_buf.cur_subm++;
> +             }
> +
> +submit_io:
> +             WARN_ON(rblk->w_buf.cur_subm > rblk->w_buf.nentries);
> +
> +             spin_unlock(&rblk->w_buf.w_lock);
> +
> +             err = nvm_submit_io(dev, rqd);
> +             if (err) {
> +                     pr_err("rrpc: I/O submission failed: %d\n", err);
> +                     mempool_free(rqd, rrpc->rq_pool);
> +                     bio_put(bio);
> +             }
> +     }
> +
> +     spin_unlock(&rlun->parent->lock);
> +     return;
> +
> +out5:
> +     nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> +out4:
> +     mempool_free(brrqd, rrpc->m_rrq_pool);
> +out3:
> +     mempool_free(rqd, rrpc->rq_pool);
> +out2:
> +     bio_put(bio);
> +out1:
> +     spin_unlock(&rblk->w_buf.w_lock);
> +     spin_unlock(&rlun->parent->lock);
> +}
> +

The function can at least be split up in two parts. The what to sync and
the actual write. The ret* labels should be replaced with more sane names.

>  static void rrpc_requeue(struct work_struct *work)
>  {
>       struct rrpc *rrpc = container_of(work, struct rrpc, ws_requeue);
> @@ -978,7 +1436,7 @@ static void rrpc_gc_free(struct rrpc *rrpc)
>  
>  static int rrpc_gc_init(struct rrpc *rrpc)
>  {
> -     rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM|WQ_UNBOUND,
> +     rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM | WQ_UNBOUND,

No need for reformatting

>                                                               rrpc->nr_luns);
>       if (!rrpc->krqd_wq)
>               return -ENOMEM;
> @@ -1080,6 +1538,8 @@ static int rrpc_map_init(struct rrpc *rrpc)
>  
>  static int rrpc_core_init(struct rrpc *rrpc)
>  {
> +     struct nvm_dev *dev = rrpc->dev;
> +
>       down_write(&rrpc_lock);
>       if (!rrpc_gcb_cache) {
>               rrpc_gcb_cache = kmem_cache_create("rrpc_gcb",
> @@ -1089,14 +1549,70 @@ static int rrpc_core_init(struct rrpc *rrpc)
>                       return -ENOMEM;
>               }
>  
> -             rrpc_rq_cache = kmem_cache_create("rrpc_rq",
> -                             sizeof(struct nvm_rq) + sizeof(struct rrpc_rq),
> -                             0, 0, NULL);
> +             rrpc_rq_cache = kmem_cache_create("nvm_rq",
> +                                     sizeof(struct nvm_rq), 0, 0, NULL);
>               if (!rrpc_rq_cache) {
>                       kmem_cache_destroy(rrpc_gcb_cache);
>                       up_write(&rrpc_lock);
>                       return -ENOMEM;
>               }
> +
> +             rrpc_rrq_cache = kmem_cache_create("rrpc_rrq",
> +                                     sizeof(struct rrpc_rq), 0, 0, NULL);
> +             if (!rrpc_rrq_cache) {
> +                     kmem_cache_destroy(rrpc_gcb_cache);
> +                     kmem_cache_destroy(rrpc_rq_cache);
> +                     up_write(&rrpc_lock);
> +                     return -ENOMEM;
> +             }
> +
> +             rrpc_buf_rrq_cache = kmem_cache_create("rrpc_m_rrq",
> +                     rrpc->max_write_pgs * sizeof(struct rrpc_buf_rq),
> +                     0, 0, NULL);
> +             if (!rrpc_buf_rrq_cache) {
> +                     kmem_cache_destroy(rrpc_gcb_cache);
> +                     kmem_cache_destroy(rrpc_rq_cache);
> +                     kmem_cache_destroy(rrpc_rrq_cache);
> +                     up_write(&rrpc_lock);
> +                     return -ENOMEM;
> +             }
> +     }
> +
> +     /* we assume that sec->sec_size is the same as the page size exposed by
> +      * rrpc (4KB). We need extra logic otherwise
> +      */
> +     if (!rrpc_block_cache) {
> +             /* Write buffer: Allocate all buffer (for all block) at once. We
> +              * avoid having to allocate a memory from the pool for each IO
> +              * at the cost pre-allocating memory for the whole block when a
> +              * new block is allocated from the media manager.
> +              */
> +             rrpc_wb_cache = kmem_cache_create("nvm_wb",
> +                     dev->pgs_per_blk * dev->sec_per_pg * dev->sec_size,
> +                     0, 0, NULL);

Seems a bit excessive to allocate a mempool for this. Why not do a
vmalloc for when the memory is needed? The normal case is 4MB? per
entry, and the cache creates 8 of them in the pool, using 64MB out of
the gate. We are not in a hot path at this point in time, so lets not
waste the resource when we might not use them.

> +             if (!rrpc_wb_cache) {
> +                     kmem_cache_destroy(rrpc_gcb_cache);
> +                     kmem_cache_destroy(rrpc_rq_cache);
> +                     kmem_cache_destroy(rrpc_rrq_cache);
> +                     kmem_cache_destroy(rrpc_buf_rrq_cache);
> +                     up_write(&rrpc_lock);
> +                     return -ENOMEM;
> +             }
> +
> +             /* Write buffer entries */
> +             rrpc_block_cache = kmem_cache_create("nvm_entry",
> +                     dev->pgs_per_blk * dev->sec_per_pg *
> +                     sizeof(struct buf_entry),
> +                     0, 0, NULL);
> +             if (!rrpc_block_cache) {
> +                     kmem_cache_destroy(rrpc_gcb_cache);
> +                     kmem_cache_destroy(rrpc_rq_cache);
> +                     kmem_cache_destroy(rrpc_rrq_cache);
> +                     kmem_cache_destroy(rrpc_buf_rrq_cache);
> +                     kmem_cache_destroy(rrpc_wb_cache);
> +                     up_write(&rrpc_lock);
> +                     return -ENOMEM;
> +             }
>       }
>       up_write(&rrpc_lock);
>  
> @@ -1113,17 +1629,45 @@ static int rrpc_core_init(struct rrpc *rrpc)
>       if (!rrpc->rq_pool)
>               return -ENOMEM;
>  
> +     rrpc->rrq_pool = mempool_create_slab_pool(64, rrpc_rrq_cache);
> +     if (!rrpc->rrq_pool)
> +             return -ENOMEM;
> +
> +     rrpc->m_rrq_pool = mempool_create_slab_pool(64, rrpc_buf_rrq_cache);
> +     if (!rrpc->m_rrq_pool)
> +             return -ENOMEM;
> +
> +     rrpc->block_pool = mempool_create_slab_pool(8, rrpc_block_cache);
> +     if (!rrpc->block_pool)
> +             return -ENOMEM;
> +
> +     rrpc->write_buf_pool = mempool_create_slab_pool(8, rrpc_wb_cache);
> +     if (!rrpc->write_buf_pool)
> +             return -ENOMEM;

Alot of pools. I hope the nvm_rq, rrpc_rq, and rrpc_buf_rq consolidation
can move it back to just a nvm_rq entry)

> +
>       spin_lock_init(&rrpc->inflights.lock);
>       INIT_LIST_HEAD(&rrpc->inflights.reqs);
>  
> +     rrpc->kw_wq = alloc_workqueue("rrpc-writer",
> +                             WQ_MEM_RECLAIM | WQ_UNBOUND, rrpc->nr_luns);
> +     if (!rrpc->kw_wq)
> +             return -ENOMEM;
> +
>       return 0;
>  }
>  
>  static void rrpc_core_free(struct rrpc *rrpc)
>  {
> +     if (rrpc->kw_wq)
> +             destroy_workqueue(rrpc->kw_wq);
> +
>       mempool_destroy(rrpc->page_pool);
>       mempool_destroy(rrpc->gcb_pool);
> +     mempool_destroy(rrpc->m_rrq_pool);
> +     mempool_destroy(rrpc->rrq_pool);
>       mempool_destroy(rrpc->rq_pool);
> +     mempool_destroy(rrpc->block_pool);
> +     mempool_destroy(rrpc->write_buf_pool);
>  }
>  
>  static void rrpc_luns_free(struct rrpc *rrpc)
> @@ -1164,6 +1708,8 @@ static int rrpc_luns_init(struct rrpc *rrpc, int 
> lun_begin, int lun_end)
>               INIT_LIST_HEAD(&rlun->open_list);
>               INIT_LIST_HEAD(&rlun->closed_list);
>  
> +             INIT_WORK(&rlun->ws_writer, rrpc_submit_write);
> +
>               INIT_WORK(&rlun->ws_gc, rrpc_lun_gc);
>               spin_lock_init(&rlun->lock);
>  
> @@ -1209,6 +1755,7 @@ static void rrpc_exit(void *private)
>  
>       flush_workqueue(rrpc->krqd_wq);
>       flush_workqueue(rrpc->kgc_wq);
> +     /* flush_workqueue(rrpc->kw_wq); */ /* TODO: Implement flush + padding*/

This can come in an additional patch.

>  
>       rrpc_free(rrpc);
>  }
> diff --git a/drivers/lightnvm/rrpc.h b/drivers/lightnvm/rrpc.h
> index 868e91a..6e188b4 100644
> --- a/drivers/lightnvm/rrpc.h
> +++ b/drivers/lightnvm/rrpc.h
> @@ -49,17 +49,67 @@ struct rrpc_inflight_rq {
>  struct rrpc_rq {
>       struct rrpc_inflight_rq inflight_rq;
>       struct rrpc_addr *addr;
> +     int nr_pages;
> +
> +     struct kref refs;

I think what you really want to do here is to add the ref counter to
rrpc_inflight_addr and then have that maintain it to when to free.
(also, name it kref)

> +};
> +
> +struct rrpc_buf_rq {
> +     struct rrpc_addr *addr;
> +     struct rrpc_rq *rrqd;

I'm not sure why we keep a rrpc_buf_rq->addr and rrpc_rq->addr?

> +};
> +
> +/* Sync strategies from write buffer to media */
> +enum {
> +     NVM_SYNC_SOFT   = 0x0,          /* Only submit at max_write_pgs
> +                                      * supported by the device. Typically 64
> +                                      * pages (256k).
> +                                      */
> +     NVM_SYNC_HARD   = 0x1,          /* Submit the whole buffer. Add padding
> +                                      * if necessary to respect the device's
> +                                      * min_write_pgs.
> +                                      */
> +     NVM_SYNC_OPORT  = 0x2,          /* Submit what we can, always respecting
> +                                      * the device's min_write_pgs.
> +                                      */

That is great. This should properly be exposed as a sysfs option later.
> +};
> +
> +struct buf_entry {
> +     struct rrpc_rq *rrqd;
> +     void *data;
> +     struct rrpc_addr *addr;
>       unsigned long flags;
>  };
>  
> +struct rrpc_w_buf {
> +     struct buf_entry *entries;      /* Entries */
> +     struct buf_entry *mem;          /* Points to the next writable entry */
> +     struct buf_entry *subm;         /* Points to the last submitted entry */
> +     int cur_mem;                    /* Current memory entry. Follows mem */
> +     int cur_subm;                   /* Entries have been submitted to dev */
> +     int nentries;                   /* Number of entries in write buffer */

nr_entries?

> +
> +     void *data;                     /* Actual data */
> +     unsigned long *sync_bitmap;     /* Bitmap representing physical
> +                                      * addresses that have been synced to
> +                                      * the media
> +                                      */
> +
> +     atomic_t refs;

kref? semaphore?

> +
> +     spinlock_t w_lock;
> +     spinlock_t s_lock;

is it short for submission lock?

> +};
> +
>  struct rrpc_block {
>       struct nvm_block *parent;
>       struct rrpc_lun *rlun;
>       struct list_head prio;
>       struct list_head list;
> +     struct rrpc_w_buf w_buf;
>  
>  #define MAX_INVALID_PAGES_STORAGE 8
> -     /* Bitmap for invalid page intries */
> +     /* Bitmap for invalid page entries */
>       unsigned long invalid_pages[MAX_INVALID_PAGES_STORAGE];
>       /* points to the next writable page within a block */
>       unsigned int next_page;
> @@ -67,7 +117,6 @@ struct rrpc_block {
>       unsigned int nr_invalid_pages;
>  
>       spinlock_t lock;
> -     atomic_t data_cmnt_size; /* data pages committed to stable storage */
>  };
>  
>  struct rrpc_lun {
> @@ -86,6 +135,7 @@ struct rrpc_lun {
>                                        */
>  
>       struct work_struct ws_gc;
> +     struct work_struct ws_writer;
>  
>       spinlock_t lock;
>  };
> @@ -136,10 +186,15 @@ struct rrpc {
>       mempool_t *page_pool;
>       mempool_t *gcb_pool;
>       mempool_t *rq_pool;
> +     mempool_t *rrq_pool;
> +     mempool_t *m_rrq_pool;
> +     mempool_t *block_pool;
> +     mempool_t *write_buf_pool;
>  
>       struct timer_list gc_timer;
>       struct workqueue_struct *krqd_wq;
>       struct workqueue_struct *kgc_wq;
> +     struct workqueue_struct *kw_wq;
>  };
>  
>  struct rrpc_block_gc {
> @@ -181,7 +236,7 @@ static inline int request_intersects(struct 
> rrpc_inflight_rq *r,
>  }
>  
>  static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
> -                          unsigned pages, struct rrpc_inflight_rq *r)
> +                             unsigned pages, struct rrpc_inflight_rq *r)
>  {
>       sector_t laddr_end = laddr + pages - 1;
>       struct rrpc_inflight_rq *rtmp;
> @@ -206,27 +261,26 @@ static int __rrpc_lock_laddr(struct rrpc *rrpc, 
> sector_t laddr,
>  }
>  
>  static inline int rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
> -                              unsigned pages,
> -                              struct rrpc_inflight_rq *r)
> +                             unsigned pages,
> +                             struct rrpc_inflight_rq *r)
>  {
>       BUG_ON((laddr + pages) > rrpc->nr_sects);
>  
>       return __rrpc_lock_laddr(rrpc, laddr, pages, r);
>  }
>  
> -static inline struct rrpc_inflight_rq *rrpc_get_inflight_rq(struct nvm_rq 
> *rqd)
> +static inline struct rrpc_inflight_rq
> +                             *rrpc_get_inflight_rq(struct rrpc_rq *rrqd)
>  {
> -     struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> -
>       return &rrqd->inflight_rq;
>  }
>  
>  static inline int rrpc_lock_rq(struct rrpc *rrpc, struct bio *bio,
> -                                                     struct nvm_rq *rqd)
> +                                                     struct rrpc_rq *rrqd)
>  {
>       sector_t laddr = rrpc_get_laddr(bio);
>       unsigned int pages = rrpc_get_pages(bio);
> -     struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> +     struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>  
>       return rrpc_lock_laddr(rrpc, laddr, pages, r);
>  }
> @@ -241,12 +295,12 @@ static inline void rrpc_unlock_laddr(struct rrpc *rrpc,
>       spin_unlock_irqrestore(&rrpc->inflights.lock, flags);
>  }
>  
> -static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct nvm_rq *rqd)
> +static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct rrpc_rq *rrqd)
>  {
> -     struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> -     uint8_t pages = rqd->nr_pages;
> +     struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
> +     uint8_t nr_pages = rrqd->nr_pages;

The cleanups can be moved to another patch.

>  
> -     BUG_ON((r->l_start + pages) > rrpc->nr_sects);
> +     BUG_ON((r->l_start + nr_pages) > rrpc->nr_sects);
>  
>       rrpc_unlock_laddr(rrpc, r);
>  }
> diff --git a/include/linux/lightnvm.h b/include/linux/lightnvm.h
> index b94f2d5..eda9743 100644
> --- a/include/linux/lightnvm.h
> +++ b/include/linux/lightnvm.h
> @@ -231,6 +231,8 @@ struct nvm_rq {
>       void *metadata;
>       dma_addr_t dma_metadata;
>  
> +     void *priv;
> +
>       struct completion *wait;
>       nvm_end_io_fn *end_io;
>  
> @@ -243,12 +245,12 @@ struct nvm_rq {
>  
>  static inline struct nvm_rq *nvm_rq_from_pdu(void *pdu)
>  {
> -     return pdu - sizeof(struct nvm_rq);
> +     return container_of(pdu, struct nvm_rq, priv);
>  }
>  
>  static inline void *nvm_rq_to_pdu(struct nvm_rq *rqdata)
>  {
> -     return rqdata + 1;
> +     return rqdata->priv;
>  }

I think the priv is unnecessary. This should be able to be expressed in
a single structure. E.g. have the rrpc_rq be a union of the data types
you either need when processing a read or a write. It seems that they no
longer have much in common.


Reply via email to