Currently, only `-Eall-fragments` is allowed for multi-threaded compression. However, in many cases, we don't want the entire file merged into the packed inode, as it may impact runtime performance.
Let’s implement multi-threaded compression for `-Efragments` now, although it's still not very fast and need to optimize performance even further for this option. Signed-off-by: Gao Xiang <hsiang...@linux.alibaba.com> --- lib/compress.c | 134 +++++++++++++++++++++++++++++++----------------- lib/fragments.c | 14 ++--- lib/inode.c | 2 +- 3 files changed, 90 insertions(+), 60 deletions(-) diff --git a/lib/compress.c b/lib/compress.c index 0b48c06..116bfe5 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -110,9 +110,10 @@ struct erofs_compress_work { }; static struct { - struct erofs_workqueue wq; + struct erofs_workqueue wq, fwq; struct erofs_compress_work *idle; pthread_mutex_t mutex; + bool hasfwq; } z_erofs_mt_ctrl; #endif @@ -577,11 +578,11 @@ static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx, if (len <= ctx->pclustersize) { if (!final || !len) return 1; - if (inode->fragment_size && !ictx->fix_dedupedfrag) { - ctx->pclustersize = roundup(len, blksz); - goto fix_dedupedfrag; - } if (may_packing) { + if (inode->fragment_size && !ictx->fix_dedupedfrag) { + ctx->pclustersize = roundup(len, blksz); + goto fix_dedupedfrag; + } e->length = len; goto frag_packing; } @@ -1056,7 +1057,20 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx, u64 offset, erofs_blk_t blkaddr) { struct z_erofs_compress_ictx *ictx = ctx->ictx; + bool last = ctx->seg_idx >= ictx->seg_num - 1; + struct erofs_inode *inode = ictx->inode; int fd = ictx->fd; + int ret; + + DBG_BUGON(offset != -1 && last && inode->fragment_size); + if (offset != -1 && last && !inode->fragment_size) { + ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum); + if (ret < 0) + return ret; + if (inode->fragment_size > ctx->remaining) + inode->fragment_size = ctx->remaining; + ctx->remaining -= inode->fragment_size; + } ctx->blkaddr = blkaddr; while (ctx->remaining) { @@ -1088,8 +1102,7 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx, } /* generate an extra extent for the deduplicated fragment */ - if (ctx->seg_idx >= ictx->seg_num - 1 && - ictx->inode->fragment_size && !ictx->fragemitted) { + if (last && inode->fragment_size && !ictx->fragemitted) { struct z_erofs_extent_item *ei; ei = malloc(sizeof(*ei)); @@ -1097,7 +1110,7 @@ int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx, return -ENOMEM; ei->e = (struct z_erofs_inmem_extent) { - .length = ictx->inode->fragment_size, + .length = inode->fragment_size, .compressedblks = 0, .raw = false, .partial = false, @@ -1207,6 +1220,8 @@ err_free_idata: return ret; } +static struct z_erofs_compress_ictx g_ictx; + #ifdef EROFS_MT_ENABLED void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr) { @@ -1354,9 +1369,12 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx) struct erofs_compress_work *cur, *head = NULL, **last = &head; struct erofs_compress_cfg *ccfg = ictx->ccfg; struct erofs_inode *inode = ictx->inode; - int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_mkfs_segment_size); - int i; + unsigned int segsz = cfg.c_mkfs_segment_size; + int nsegs, i; + nsegs = DIV_ROUND_UP(inode->i_size - inode->fragment_size, segsz); + if (!nsegs) + nsegs = 1; ictx->seg_num = nsegs; pthread_mutex_init(&ictx->mutex, NULL); pthread_cond_init(&ictx->cond, NULL); @@ -1385,13 +1403,6 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx) }; init_list_head(&cur->ctx.extents); - if (i == nsegs - 1) - cur->ctx.remaining = inode->i_size - - inode->fragment_size - - i * cfg.c_mkfs_segment_size; - else - cur->ctx.remaining = cfg.c_mkfs_segment_size; - cur->alg_id = ccfg->handle.alg->id; cur->alg_name = ccfg->handle.alg->name; cur->comp_level = ccfg->handle.compression_level; @@ -1399,6 +1410,17 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx) cur->errcode = 1; /* mark as "in progress" */ cur->work.fn = z_erofs_mt_workfn; + if (i >= nsegs - 1) { + cur->ctx.remaining = inode->i_size - + inode->fragment_size - i * segsz; + if (z_erofs_mt_ctrl.hasfwq) { + erofs_queue_work(&z_erofs_mt_ctrl.fwq, + &cur->work); + continue; + } + } else { + cur->ctx.remaining = segsz; + } erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work); } ictx->mtworks = head; @@ -1460,14 +1482,52 @@ out: free(ictx); return ret; } -#endif -static struct z_erofs_compress_ictx g_ictx; +static int z_erofs_mt_init(void) +{ + unsigned int workers = cfg.c_mt_workers; + int ret; + + if (workers < 1) + return 0; + if (workers >= 1 && cfg.c_dedupe) { + erofs_warn("multi-threaded dedupe is NOT implemented for now"); + cfg.c_mt_workers = 0; + } + + if (cfg.c_fragments && workers > 1) { + ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.fwq, 1, 128, + z_erofs_mt_wq_tls_alloc, + z_erofs_mt_wq_tls_free); + if (ret) + return ret; + z_erofs_mt_ctrl.hasfwq = true; + --workers; + } + + ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, workers, + workers << 2, z_erofs_mt_wq_tls_alloc, + z_erofs_mt_wq_tls_free); + if (ret) + return ret; + z_erofs_mt_enabled = true; + pthread_mutex_init(&g_ictx.mutex, NULL); + pthread_cond_init(&g_ictx.cond, NULL); + return 0; +} +#else +int z_erofs_mt_init(void) +{ + return 0; +} +#endif void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos) { struct erofs_sb_info *sbi = inode->sbi; struct z_erofs_compress_ictx *ictx; + bool all_fragments = cfg.c_all_fragments && + !erofs_is_packed_inode(inode); int ret; /* initialize per-file compression setting */ @@ -1502,8 +1562,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos) inode->idata_size = 0; inode->fragment_size = 0; - if (!z_erofs_mt_enabled || - (cfg.c_all_fragments && !erofs_is_packed_inode(inode))) { + if (!z_erofs_mt_enabled || all_fragments) { #ifdef EROFS_MT_ENABLED pthread_mutex_lock(&g_ictx.mutex); if (g_ictx.seg_num) @@ -1528,7 +1587,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos) * Handle tails in advance to avoid writing duplicated * parts into the packed inode. */ - if (cfg.c_fragments && !erofs_is_packed_inode(inode) && + if ((cfg.c_fragments && ictx == &g_ictx) && cfg.c_fragdedupe != FRAGDEDUPE_OFF) { ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum); if (ret < 0) @@ -1547,8 +1606,7 @@ void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos) ictx->fix_dedupedfrag = false; ictx->fragemitted = false; - if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) && - !inode->fragment_size) { + if (all_fragments && !inode->fragment_size) { ret = z_erofs_pack_file_from_fd(inode, fd, ictx->tof_chksum); if (ret) goto err_free_idata; @@ -1819,30 +1877,7 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s } z_erofs_mt_enabled = false; -#ifdef EROFS_MT_ENABLED - if (cfg.c_mt_workers >= 1 && (cfg.c_dedupe || - (cfg.c_fragments && !cfg.c_all_fragments))) { - if (cfg.c_dedupe) - erofs_warn("multi-threaded dedupe is NOT implemented for now"); - if (cfg.c_fragments) - erofs_warn("multi-threaded fragments is NOT implemented for now"); - cfg.c_mt_workers = 0; - } - - if (cfg.c_mt_workers >= 1) { - ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, - cfg.c_mt_workers, - cfg.c_mt_workers << 2, - z_erofs_mt_wq_tls_alloc, - z_erofs_mt_wq_tls_free); - if (ret) - return ret; - z_erofs_mt_enabled = true; - } - pthread_mutex_init(&g_ictx.mutex, NULL); - pthread_cond_init(&g_ictx.cond, NULL); -#endif - return 0; + return z_erofs_mt_init(); } int z_erofs_compress_exit(void) @@ -1858,6 +1893,9 @@ int z_erofs_compress_exit(void) if (z_erofs_mt_enabled) { #ifdef EROFS_MT_ENABLED ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq); + if (ret) + return ret; + ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.fwq); if (ret) return ret; while (z_erofs_mt_ctrl.idle) { diff --git a/lib/fragments.c b/lib/fragments.c index fecebb5..41b9912 100644 --- a/lib/fragments.c +++ b/lib/fragments.c @@ -146,21 +146,13 @@ int z_erofs_fragments_dedupe(struct erofs_inode *inode, int fd, u32 *tofcrc) if (inode->i_size <= EROFS_TOF_HASHLEN) return 0; - if (erofs_lseek64(fd, inode->i_size - EROFS_TOF_HASHLEN, SEEK_SET) < 0) - return -errno; - - ret = read(fd, data_to_hash, EROFS_TOF_HASHLEN); + ret = pread(fd, data_to_hash, EROFS_TOF_HASHLEN, + inode->i_size - EROFS_TOF_HASHLEN); if (ret != EROFS_TOF_HASHLEN) return -errno; *tofcrc = erofs_crc32c(~0, data_to_hash, EROFS_TOF_HASHLEN); - ret = z_erofs_fragments_dedupe_find(inode, fd, *tofcrc); - if (ret < 0) - return ret; - ret = lseek(fd, 0, SEEK_SET); - if (ret < 0) - return -errno; - return 0; + return z_erofs_fragments_dedupe_find(inode, fd, *tofcrc); } static int z_erofs_fragments_dedupe_insert(struct list_head *hash, void *data, diff --git a/lib/inode.c b/lib/inode.c index 8c9a8ec..c4edd43 100644 --- a/lib/inode.c +++ b/lib/inode.c @@ -1312,7 +1312,7 @@ struct erofs_mkfs_dfops { bool idle; /* initialize as false before the dfops worker runs */ }; -#define EROFS_MT_QUEUE_SIZE 128 +#define EROFS_MT_QUEUE_SIZE 256 static void erofs_mkfs_flushjobs(struct erofs_sb_info *sbi) { -- 2.43.5