Group fragments into multiple buckets using their hashed tail-data
digest, enabling parallel compression of different buckets.

Signed-off-by: Gao Xiang <hsiang...@linux.alibaba.com>
---
 lib/compress.c | 88 ++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 71 insertions(+), 17 deletions(-)

diff --git a/lib/compress.c b/lib/compress.c
index cbc51ca..b6e0c12 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -59,7 +59,10 @@ struct z_erofs_compress_ictx {               /* inode 
context */
 };
 
 struct z_erofs_compress_sctx {         /* segment context */
-       struct list_head extents;
+       union {
+               struct list_head extents;
+               struct list_head sibling;
+       };
        struct z_erofs_compress_ictx *ictx;
 
        u8 *queue;
@@ -104,11 +107,18 @@ struct erofs_compress_work {
 };
 
 static struct {
-       struct erofs_workqueue wq, fwq;
+       struct erofs_workqueue wq;
        struct erofs_compress_work *idle;
        pthread_mutex_t mutex;
        bool hasfwq;
 } z_erofs_mt_ctrl;
+
+struct z_erofs_compress_fslot {
+       struct list_head pending;
+       pthread_mutex_t lock;
+       bool inprogress;
+};
+
 #endif
 
 /* compressing configuration specified by users */
@@ -120,6 +130,9 @@ struct erofs_compress_cfg {
 
 struct z_erofs_mgr {
        struct erofs_compress_cfg ccfg[EROFS_MAX_COMPR_CFGS];
+#ifdef EROFS_MT_ENABLED
+       struct z_erofs_compress_fslot fslot[1024];
+#endif
 };
 
 static bool z_erofs_mt_enabled;
@@ -1364,6 +1377,32 @@ out:
        pthread_mutex_unlock(&ictx->mutex);
 }
 
+void z_erofs_mt_f_workfn(struct erofs_work *work, void *tlsp)
+{
+       struct erofs_compress_work *cwork = (struct erofs_compress_work *)work;
+       struct erofs_sb_info *sbi = cwork->ctx.ictx->inode->sbi;
+       u32 tofh = cwork->ctx.ictx->tofh;
+       struct z_erofs_compress_fslot *fs = &sbi->zmgr->fslot[tofh & 1023];
+
+       while (1) {
+               z_erofs_mt_workfn(work, tlsp);
+               pthread_mutex_lock(&fs->lock);
+
+               if (list_empty(&fs->pending)) {
+                       fs->inprogress = false;
+                       pthread_mutex_unlock(&fs->lock);
+                       break;
+               }
+               cwork = list_first_entry(&fs->pending,
+                                        struct erofs_compress_work,
+                                        ctx.sibling);
+               list_del(&cwork->ctx.sibling);
+               pthread_mutex_unlock(&fs->lock);
+               init_list_head(&cwork->ctx.extents);
+               work = &cwork->work;
+       }
+}
+
 int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx,
                          struct z_erofs_compress_sctx *sctx)
 {
@@ -1463,19 +1502,32 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx 
*ictx)
                cur->dict_size = ccfg->handle.dict_size;
                cur->errcode = 1;       /* mark as "in progress" */
 
-               cur->work.fn = z_erofs_mt_workfn;
                if (i >= nsegs - 1) {
                        cur->ctx.remaining = inode->i_size -
                                        inode->fragment_size - (u64)i * segsz;
 
-                       if (z_erofs_mt_ctrl.hasfwq) {
-                               erofs_queue_work(&z_erofs_mt_ctrl.fwq,
+                       if (z_erofs_mt_ctrl.hasfwq && ictx->tofh != ~0U) {
+                               struct z_erofs_mgr *zmgr = inode->sbi->zmgr;
+                               struct z_erofs_compress_fslot *fs =
+                                       &zmgr->fslot[ictx->tofh & 1023];
+
+                               pthread_mutex_lock(&fs->lock);
+                               if (fs->inprogress) {
+                                       list_add_tail(&cur->ctx.sibling,
+                                                     &fs->pending);
+                               } else {
+                                       fs->inprogress = true;
+                                       cur->work.fn = z_erofs_mt_f_workfn;
+                                       erofs_queue_work(&z_erofs_mt_ctrl.wq,
                                                 &cur->work);
+                               }
+                               pthread_mutex_unlock(&fs->lock);
                                continue;
                        }
                } else {
                        cur->ctx.remaining = segsz;
                }
+               cur->work.fn = z_erofs_mt_workfn;
                erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
        }
        ictx->mtworks = head;
@@ -1548,15 +1600,8 @@ static int z_erofs_mt_init(void)
                erofs_warn("multi-threaded dedupe is NOT implemented for now");
                cfg.c_mt_workers = 0;
        } else {
-               if (cfg.c_fragments && workers > 1) {
-                       ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.fwq, 1, 32,
-                                                   z_erofs_mt_wq_tls_alloc,
-                                                   z_erofs_mt_wq_tls_free);
-                       if (ret)
-                               return ret;
+               if (cfg.c_fragments && workers > 1)
                        z_erofs_mt_ctrl.hasfwq = true;
-                       --workers;
-               }
 
                ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, workers,
                                            workers << 2,
@@ -1930,7 +1975,19 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, 
struct erofs_buffer_head *s
        }
 
        z_erofs_mt_enabled = false;
-       return z_erofs_mt_init();
+       ret = z_erofs_mt_init();
+       if (ret)
+               return ret;
+
+#ifdef EROFS_MT_ENABLED
+       if (z_erofs_mt_ctrl.hasfwq) {
+               for (i = 0; i < ARRAY_SIZE(sbi->zmgr->fslot); ++i) {
+                       init_list_head(&sbi->zmgr->fslot[i].pending);
+                       pthread_mutex_init(&sbi->zmgr->fslot[i].lock, NULL);
+               }
+       }
+#endif
+       return 0;
 }
 
 int z_erofs_compress_exit(struct erofs_sb_info *sbi)
@@ -1951,9 +2008,6 @@ int z_erofs_compress_exit(struct erofs_sb_info *sbi)
        if (z_erofs_mt_enabled) {
 #ifdef EROFS_MT_ENABLED
                ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
-               if (ret)
-                       return ret;
-               ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.fwq);
                if (ret)
                        return ret;
                while (z_erofs_mt_ctrl.idle) {
-- 
2.43.5


Reply via email to