Group fragments into multiple buckets using their hashed tail-data digest, enabling parallel compression of different buckets.
Signed-off-by: Gao Xiang <hsiang...@linux.alibaba.com> --- lib/compress.c | 88 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 71 insertions(+), 17 deletions(-) diff --git a/lib/compress.c b/lib/compress.c index cbc51ca..b6e0c12 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -59,7 +59,10 @@ struct z_erofs_compress_ictx { /* inode context */ }; struct z_erofs_compress_sctx { /* segment context */ - struct list_head extents; + union { + struct list_head extents; + struct list_head sibling; + }; struct z_erofs_compress_ictx *ictx; u8 *queue; @@ -104,11 +107,18 @@ struct erofs_compress_work { }; static struct { - struct erofs_workqueue wq, fwq; + struct erofs_workqueue wq; struct erofs_compress_work *idle; pthread_mutex_t mutex; bool hasfwq; } z_erofs_mt_ctrl; + +struct z_erofs_compress_fslot { + struct list_head pending; + pthread_mutex_t lock; + bool inprogress; +}; + #endif /* compressing configuration specified by users */ @@ -120,6 +130,9 @@ struct erofs_compress_cfg { struct z_erofs_mgr { struct erofs_compress_cfg ccfg[EROFS_MAX_COMPR_CFGS]; +#ifdef EROFS_MT_ENABLED + struct z_erofs_compress_fslot fslot[1024]; +#endif }; static bool z_erofs_mt_enabled; @@ -1364,6 +1377,32 @@ out: pthread_mutex_unlock(&ictx->mutex); } +void z_erofs_mt_f_workfn(struct erofs_work *work, void *tlsp) +{ + struct erofs_compress_work *cwork = (struct erofs_compress_work *)work; + struct erofs_sb_info *sbi = cwork->ctx.ictx->inode->sbi; + u32 tofh = cwork->ctx.ictx->tofh; + struct z_erofs_compress_fslot *fs = &sbi->zmgr->fslot[tofh & 1023]; + + while (1) { + z_erofs_mt_workfn(work, tlsp); + pthread_mutex_lock(&fs->lock); + + if (list_empty(&fs->pending)) { + fs->inprogress = false; + pthread_mutex_unlock(&fs->lock); + break; + } + cwork = list_first_entry(&fs->pending, + struct erofs_compress_work, + ctx.sibling); + list_del(&cwork->ctx.sibling); + pthread_mutex_unlock(&fs->lock); + init_list_head(&cwork->ctx.extents); + work = &cwork->work; + } +} + int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx, struct z_erofs_compress_sctx *sctx) { @@ -1463,19 +1502,32 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx) cur->dict_size = ccfg->handle.dict_size; cur->errcode = 1; /* mark as "in progress" */ - cur->work.fn = z_erofs_mt_workfn; if (i >= nsegs - 1) { cur->ctx.remaining = inode->i_size - inode->fragment_size - (u64)i * segsz; - if (z_erofs_mt_ctrl.hasfwq) { - erofs_queue_work(&z_erofs_mt_ctrl.fwq, + if (z_erofs_mt_ctrl.hasfwq && ictx->tofh != ~0U) { + struct z_erofs_mgr *zmgr = inode->sbi->zmgr; + struct z_erofs_compress_fslot *fs = + &zmgr->fslot[ictx->tofh & 1023]; + + pthread_mutex_lock(&fs->lock); + if (fs->inprogress) { + list_add_tail(&cur->ctx.sibling, + &fs->pending); + } else { + fs->inprogress = true; + cur->work.fn = z_erofs_mt_f_workfn; + erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work); + } + pthread_mutex_unlock(&fs->lock); continue; } } else { cur->ctx.remaining = segsz; } + cur->work.fn = z_erofs_mt_workfn; erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work); } ictx->mtworks = head; @@ -1548,15 +1600,8 @@ static int z_erofs_mt_init(void) erofs_warn("multi-threaded dedupe is NOT implemented for now"); cfg.c_mt_workers = 0; } else { - if (cfg.c_fragments && workers > 1) { - ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.fwq, 1, 32, - z_erofs_mt_wq_tls_alloc, - z_erofs_mt_wq_tls_free); - if (ret) - return ret; + if (cfg.c_fragments && workers > 1) z_erofs_mt_ctrl.hasfwq = true; - --workers; - } ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, workers, workers << 2, @@ -1930,7 +1975,19 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s } z_erofs_mt_enabled = false; - return z_erofs_mt_init(); + ret = z_erofs_mt_init(); + if (ret) + return ret; + +#ifdef EROFS_MT_ENABLED + if (z_erofs_mt_ctrl.hasfwq) { + for (i = 0; i < ARRAY_SIZE(sbi->zmgr->fslot); ++i) { + init_list_head(&sbi->zmgr->fslot[i].pending); + pthread_mutex_init(&sbi->zmgr->fslot[i].lock, NULL); + } + } +#endif + return 0; } int z_erofs_compress_exit(struct erofs_sb_info *sbi) @@ -1951,9 +2008,6 @@ int z_erofs_compress_exit(struct erofs_sb_info *sbi) if (z_erofs_mt_enabled) { #ifdef EROFS_MT_ENABLED ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq); - if (ret) - return ret; - ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.fwq); if (ret) return ret; while (z_erofs_mt_ctrl.idle) { -- 2.43.5