Currently, only `-Eall-fragments` is allowed for multi-threaded
compression.  However, in many cases, we don't want the entire file
merged into the packed inode, as it may impact runtime performance.

Let's implement multi-threaded compression for `-Efragments` now,
although it's still not very fast and need to optimize performance
even further for this option.

Note that the image sizes could be larger without `-Ededupe` compared
to `-Eall-fragments` since the head parts aren't deduplicated for now.

Signed-off-by: Gao Xiang <hsiang...@linux.alibaba.com>
---
change since v2:
 - fix up broken `-Ededupe` fallback.

 lib/compress.c  | 137 +++++++++++++++++++++++++++++++-----------------
 lib/fragments.c |  14 ++---
 lib/inode.c     |   2 +-
 3 files changed, 93 insertions(+), 60 deletions(-)

diff --git a/lib/compress.c b/lib/compress.c
index 0b48c06..32f58b5 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -110,9 +110,10 @@ struct erofs_compress_work {
 };
 
 static struct {
-       struct erofs_workqueue wq;
+       struct erofs_workqueue wq, fwq;
        struct erofs_compress_work *idle;
        pthread_mutex_t mutex;
+       bool hasfwq;
 } z_erofs_mt_ctrl;
 #endif
 
@@ -577,11 +578,11 @@ static int __z_erofs_compress_one(struct 
z_erofs_compress_sctx *ctx,
        if (len <= ctx->pclustersize) {
                if (!final || !len)
                        return 1;
-               if (inode->fragment_size && !ictx->fix_dedupedfrag) {
-                       ctx->pclustersize = roundup(len, blksz);
-                       goto fix_dedupedfrag;
-               }
                if (may_packing) {
+                       if (inode->fragment_size && !ictx->fix_dedupedfrag) {
+                               ctx->pclustersize = roundup(len, blksz);
+                               goto fix_dedupedfrag;
+                       }
                        e->length = len;
                        goto frag_packing;
                }
@@ -1056,7 +1057,22 @@ int z_erofs_compress_segment(struct 
z_erofs_compress_sctx *ctx,
                             u64 offset, erofs_blk_t blkaddr)
 {
        struct z_erofs_compress_ictx *ictx = ctx->ictx;
+       struct erofs_inode *inode = ictx->inode;
+       bool frag = cfg.c_fragments && !erofs_is_packed_inode(inode) &&
+               ctx->seg_idx >= ictx->seg_num - 1;
        int fd = ictx->fd;
+       int ret;
+
+       DBG_BUGON(offset != -1 && frag && inode->fragment_size);
+       if (offset != -1 && frag && !inode->fragment_size &&
+           cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
+               ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
+               if (ret < 0)
+                       return ret;
+               if (inode->fragment_size > ctx->remaining)
+                       inode->fragment_size = ctx->remaining;
+               ctx->remaining -= inode->fragment_size;
+       }
 
        ctx->blkaddr = blkaddr;
        while (ctx->remaining) {
@@ -1088,8 +1104,7 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx 
*ctx,
        }
 
        /* generate an extra extent for the deduplicated fragment */
-       if (ctx->seg_idx >= ictx->seg_num - 1 &&
-           ictx->inode->fragment_size && !ictx->fragemitted) {
+       if (frag && inode->fragment_size && !ictx->fragemitted) {
                struct z_erofs_extent_item *ei;
 
                ei = malloc(sizeof(*ei));
@@ -1097,7 +1112,7 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx 
*ctx,
                        return -ENOMEM;
 
                ei->e = (struct z_erofs_inmem_extent) {
-                       .length = ictx->inode->fragment_size,
+                       .length = inode->fragment_size,
                        .compressedblks = 0,
                        .raw = false,
                        .partial = false,
@@ -1207,6 +1222,8 @@ err_free_idata:
        return ret;
 }
 
+static struct z_erofs_compress_ictx g_ictx;
+
 #ifdef EROFS_MT_ENABLED
 void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr)
 {
@@ -1354,9 +1371,12 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx 
*ictx)
        struct erofs_compress_work *cur, *head = NULL, **last = &head;
        struct erofs_compress_cfg *ccfg = ictx->ccfg;
        struct erofs_inode *inode = ictx->inode;
-       int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_mkfs_segment_size);
-       int i;
+       unsigned int segsz = cfg.c_mkfs_segment_size;
+       int nsegs, i;
 
+       nsegs = DIV_ROUND_UP(inode->i_size - inode->fragment_size, segsz);
+       if (!nsegs)
+               nsegs = 1;
        ictx->seg_num = nsegs;
        pthread_mutex_init(&ictx->mutex, NULL);
        pthread_cond_init(&ictx->cond, NULL);
@@ -1385,13 +1405,6 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx 
*ictx)
                };
                init_list_head(&cur->ctx.extents);
 
-               if (i == nsegs - 1)
-                       cur->ctx.remaining = inode->i_size -
-                                             inode->fragment_size -
-                                             i * cfg.c_mkfs_segment_size;
-               else
-                       cur->ctx.remaining = cfg.c_mkfs_segment_size;
-
                cur->alg_id = ccfg->handle.alg->id;
                cur->alg_name = ccfg->handle.alg->name;
                cur->comp_level = ccfg->handle.compression_level;
@@ -1399,6 +1412,17 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx 
*ictx)
                cur->errcode = 1;       /* mark as "in progress" */
 
                cur->work.fn = z_erofs_mt_workfn;
+               if (i >= nsegs - 1) {
+                       cur->ctx.remaining = inode->i_size -
+                                       inode->fragment_size - (u64)i * segsz;
+                       if (z_erofs_mt_ctrl.hasfwq) {
+                               erofs_queue_work(&z_erofs_mt_ctrl.fwq,
+                                                &cur->work);
+                               continue;
+                       }
+               } else {
+                       cur->ctx.remaining = segsz;
+               }
                erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work);
        }
        ictx->mtworks = head;
@@ -1460,14 +1484,53 @@ out:
        free(ictx);
        return ret;
 }
-#endif
 
-static struct z_erofs_compress_ictx g_ictx;
+static int z_erofs_mt_init(void)
+{
+       unsigned int workers = cfg.c_mt_workers;
+       int ret;
+
+       if (workers < 1)
+               return 0;
+       if (workers >= 1 && cfg.c_dedupe) {
+               erofs_warn("multi-threaded dedupe is NOT implemented for now");
+               cfg.c_mt_workers = 0;
+       } else {
+               if (cfg.c_fragments && workers > 1) {
+                       ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.fwq, 1, 32,
+                                                   z_erofs_mt_wq_tls_alloc,
+                                                   z_erofs_mt_wq_tls_free);
+                       if (ret)
+                               return ret;
+                       z_erofs_mt_ctrl.hasfwq = true;
+                       --workers;
+               }
+
+               ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, workers,
+                                           workers << 2,
+                                           z_erofs_mt_wq_tls_alloc,
+                                           z_erofs_mt_wq_tls_free);
+               if (ret)
+                       return ret;
+               z_erofs_mt_enabled = true;
+       }
+       pthread_mutex_init(&g_ictx.mutex, NULL);
+       pthread_cond_init(&g_ictx.cond, NULL);
+       return 0;
+}
+#else
+static int z_erofs_mt_init(void)
+{
+       return 0;
+}
+#endif
 
 void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 {
        struct erofs_sb_info *sbi = inode->sbi;
        struct z_erofs_compress_ictx *ictx;
+       bool all_fragments = cfg.c_all_fragments &&
+                                       !erofs_is_packed_inode(inode);
        int ret;
 
        /* initialize per-file compression setting */
@@ -1502,8 +1565,7 @@ void *erofs_begin_compressed_file(struct erofs_inode 
*inode, int fd, u64 fpos)
        inode->idata_size = 0;
        inode->fragment_size = 0;
 
-       if (!z_erofs_mt_enabled ||
-           (cfg.c_all_fragments && !erofs_is_packed_inode(inode))) {
+       if (!z_erofs_mt_enabled || all_fragments) {
 #ifdef EROFS_MT_ENABLED
                pthread_mutex_lock(&g_ictx.mutex);
                if (g_ictx.seg_num)
@@ -1529,7 +1591,7 @@ void *erofs_begin_compressed_file(struct erofs_inode 
*inode, int fd, u64 fpos)
         * parts into the packed inode.
         */
        if (cfg.c_fragments && !erofs_is_packed_inode(inode) &&
-           cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
+           ictx == &g_ictx && cfg.c_fragdedupe != FRAGDEDUPE_OFF) {
                ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
                if (ret < 0)
                        goto err_free_ictx;
@@ -1547,8 +1609,7 @@ void *erofs_begin_compressed_file(struct erofs_inode 
*inode, int fd, u64 fpos)
        ictx->fix_dedupedfrag = false;
        ictx->fragemitted = false;
 
-       if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
-           !inode->fragment_size) {
+       if (all_fragments && !inode->fragment_size) {
                ret = z_erofs_pack_file_from_fd(inode, fd, ictx->tof_chksum);
                if (ret)
                        goto err_free_idata;
@@ -1819,30 +1880,7 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, 
struct erofs_buffer_head *s
        }
 
        z_erofs_mt_enabled = false;
-#ifdef EROFS_MT_ENABLED
-       if (cfg.c_mt_workers >= 1 && (cfg.c_dedupe ||
-                                     (cfg.c_fragments && 
!cfg.c_all_fragments))) {
-               if (cfg.c_dedupe)
-                       erofs_warn("multi-threaded dedupe is NOT implemented 
for now");
-               if (cfg.c_fragments)
-                       erofs_warn("multi-threaded fragments is NOT implemented 
for now");
-               cfg.c_mt_workers = 0;
-       }
-
-       if (cfg.c_mt_workers >= 1) {
-               ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq,
-                                           cfg.c_mt_workers,
-                                           cfg.c_mt_workers << 2,
-                                           z_erofs_mt_wq_tls_alloc,
-                                           z_erofs_mt_wq_tls_free);
-               if (ret)
-                       return ret;
-               z_erofs_mt_enabled = true;
-       }
-       pthread_mutex_init(&g_ictx.mutex, NULL);
-       pthread_cond_init(&g_ictx.cond, NULL);
-#endif
-       return 0;
+       return z_erofs_mt_init();
 }
 
 int z_erofs_compress_exit(void)
@@ -1858,6 +1896,9 @@ int z_erofs_compress_exit(void)
        if (z_erofs_mt_enabled) {
 #ifdef EROFS_MT_ENABLED
                ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq);
+               if (ret)
+                       return ret;
+               ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.fwq);
                if (ret)
                        return ret;
                while (z_erofs_mt_ctrl.idle) {
diff --git a/lib/fragments.c b/lib/fragments.c
index fecebb5..41b9912 100644
--- a/lib/fragments.c
+++ b/lib/fragments.c
@@ -146,21 +146,13 @@ int z_erofs_fragments_dedupe(struct erofs_inode *inode, 
int fd, u32 *tofcrc)
        if (inode->i_size <= EROFS_TOF_HASHLEN)
                return 0;
 
-       if (erofs_lseek64(fd, inode->i_size - EROFS_TOF_HASHLEN, SEEK_SET) < 0)
-               return -errno;
-
-       ret = read(fd, data_to_hash, EROFS_TOF_HASHLEN);
+       ret = pread(fd, data_to_hash, EROFS_TOF_HASHLEN,
+                   inode->i_size - EROFS_TOF_HASHLEN);
        if (ret != EROFS_TOF_HASHLEN)
                return -errno;
 
        *tofcrc = erofs_crc32c(~0, data_to_hash, EROFS_TOF_HASHLEN);
-       ret = z_erofs_fragments_dedupe_find(inode, fd, *tofcrc);
-       if (ret < 0)
-               return ret;
-       ret = lseek(fd, 0, SEEK_SET);
-       if (ret < 0)
-               return -errno;
-       return 0;
+       return z_erofs_fragments_dedupe_find(inode, fd, *tofcrc);
 }
 
 static int z_erofs_fragments_dedupe_insert(struct list_head *hash, void *data,
diff --git a/lib/inode.c b/lib/inode.c
index 8c9a8ec..c4edd43 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -1312,7 +1312,7 @@ struct erofs_mkfs_dfops {
        bool idle;      /* initialize as false before the dfops worker runs */
 };
 
-#define EROFS_MT_QUEUE_SIZE 128
+#define EROFS_MT_QUEUE_SIZE 256
 
 static void erofs_mkfs_flushjobs(struct erofs_sb_info *sbi)
 {
-- 
2.43.5


Reply via email to