I think it is easier to understand the source code if the names of variables and pointers in the same structure are unified.
struct z_erofs_compress_ictx inode_ctx; // i stands for inode? struct z_erofs_compress_ictx *ictx; struct z_erofs_compress_sctx seg_ctx; struct z_erofs_compress_sctx *sctx; 2024年3月15日(金) 10:11 Gao Xiang <hsiang...@linux.alibaba.com>: > > From: Yifan Zhao <zhaoyi...@sjtu.edu.cn> > > Currently, the creation of EROFS compressed image creation is > single-threaded, which suffers from performance issues. This patch > attempts to address it by compressing the large file in parallel. > > Specifically, each input file larger than 16MB is splited into segments, > and each worker thread compresses a segment as if it were a separate > file. Finally, the main thread merges all the compressed segments. > > Multi-threaded compression is not compatible with -Ededupe, > -E(all-)fragments and -Eztailpacking for now. > > Signed-off-by: Yifan Zhao <zhaoyi...@sjtu.edu.cn> > Co-authored-by: Tong Xin <xin_t...@sjtu.edu.cn> > Signed-off-by: Gao Xiang <hsiang...@linux.alibaba.com> > --- > v7: > - support -Eztailpacking; > - wq_private -> wq_tls; > - minor updates. > > include/erofs/compress.h | 3 +- > lib/compress.c | 548 ++++++++++++++++++++++++++++++++------- > lib/compressor.c | 2 + > mkfs/main.c | 8 +- > 4 files changed, 464 insertions(+), 97 deletions(-) > > diff --git a/include/erofs/compress.h b/include/erofs/compress.h > index b3272f7..3253611 100644 > --- a/include/erofs/compress.h > +++ b/include/erofs/compress.h > @@ -14,7 +14,8 @@ extern "C" > > #include "internal.h" > > -#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024) > +#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024) > +#define Z_EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2) > > void z_erofs_drop_inline_pcluster(struct erofs_inode *inode); > int erofs_write_compressed_file(struct erofs_inode *inode, int fd); > diff --git a/lib/compress.c b/lib/compress.c > index 4101009..0d796c8 100644 > --- a/lib/compress.c > +++ b/lib/compress.c > @@ -20,6 +20,9 @@ > #include "erofs/block_list.h" > #include "erofs/compress_hints.h" > #include "erofs/fragments.h" > +#ifdef EROFS_MT_ENABLED > +#include "erofs/workqueue.h" > +#endif > > /* compressing configuration specified by users */ > struct erofs_compress_cfg { > @@ -33,29 +36,77 @@ struct z_erofs_extent_item { > struct z_erofs_inmem_extent e; > }; > > -struct z_erofs_vle_compress_ctx { > - u8 queue[EROFS_CONFIG_COMPR_MAX_SZ * 2]; > +struct z_erofs_compress_ictx { > + struct erofs_inode *inode; > + int fd; > + unsigned int pclustersize; > + > + u32 tof_chksum; > + bool fix_dedupedfrag; > + bool fragemitted; > + > + /* fields for write indexes */ > + u8 *metacur; > + struct list_head extents; > + u16 clusterofs; > +}; > + > +struct z_erofs_compress_sctx { /* segment context */ > + struct z_erofs_compress_ictx *ictx; > + > + u8 *queue; > struct list_head extents; > struct z_erofs_extent_item *pivot; > > - struct erofs_inode *inode; > - struct erofs_compress_cfg *ccfg; > + struct erofs_compress *chandle; > + char *destbuf; > > - u8 *metacur; > unsigned int head, tail; > erofs_off_t remaining; > - unsigned int pclustersize; > erofs_blk_t blkaddr; /* pointing to the next blkaddr */ > u16 clusterofs; > > - u32 tof_chksum; > - bool fix_dedupedfrag; > - bool fragemitted; > + int seg_num, seg_idx; > + > + void *membuf; > + erofs_off_t memoff; > +}; > + > +#ifdef EROFS_MT_ENABLED > +struct erofs_compress_wq_tls { > + u8 *queue; > + char *destbuf; > + struct erofs_compress_cfg *ccfg; > }; > > +struct erofs_compress_work { > + /* Note: struct erofs_work must be the first member */ > + struct erofs_work work; > + struct z_erofs_compress_sctx ctx; > + struct erofs_compress_work *next; > + > + unsigned int alg_id; > + char *alg_name; > + unsigned int comp_level; > + unsigned int dict_size; > + > + int errcode; > +}; > + > +static struct { > + struct erofs_workqueue wq; > + struct erofs_compress_work *idle; > + pthread_mutex_t mutex; > + pthread_cond_t cond; > + int nfini; > +} z_erofs_mt_ctrl; > +#endif > + > +static bool z_erofs_mt_enabled; > + > #define Z_EROFS_LEGACY_MAP_HEADER_SIZE Z_EROFS_FULL_INDEX_ALIGN(0) > > -static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx) > +static void z_erofs_write_indexes_final(struct z_erofs_compress_ictx *ctx) > { > const unsigned int type = Z_EROFS_LCLUSTER_TYPE_PLAIN; > struct z_erofs_lcluster_index di; > @@ -71,7 +122,7 @@ static void z_erofs_write_indexes_final(struct > z_erofs_vle_compress_ctx *ctx) > ctx->metacur += sizeof(di); > } > > -static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx, > +static void z_erofs_write_extent(struct z_erofs_compress_ictx *ctx, > struct z_erofs_inmem_extent *e) > { > struct erofs_inode *inode = ctx->inode; > @@ -170,7 +221,7 @@ static void z_erofs_write_extent(struct > z_erofs_vle_compress_ctx *ctx, > ctx->clusterofs = clusterofs + count; > } > > -static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx) > +static void z_erofs_write_indexes(struct z_erofs_compress_ictx *ctx) > { > struct z_erofs_extent_item *ei, *n; > > @@ -184,15 +235,16 @@ static void z_erofs_write_indexes(struct > z_erofs_vle_compress_ctx *ctx) > z_erofs_write_indexes_final(ctx); > } > > -static bool z_erofs_need_refill(struct z_erofs_vle_compress_ctx *ctx) > +static bool z_erofs_need_refill(struct z_erofs_compress_sctx *ctx) > { > const bool final = !ctx->remaining; > unsigned int qh_aligned, qh_after; > + struct erofs_inode *inode = ctx->ictx->inode; > > if (final || ctx->head < EROFS_CONFIG_COMPR_MAX_SZ) > return false; > > - qh_aligned = round_down(ctx->head, erofs_blksiz(ctx->inode->sbi)); > + qh_aligned = round_down(ctx->head, erofs_blksiz(inode->sbi)); > qh_after = ctx->head - qh_aligned; > memmove(ctx->queue, ctx->queue + qh_aligned, ctx->tail - qh_aligned); > ctx->tail -= qh_aligned; > @@ -204,7 +256,7 @@ static struct z_erofs_extent_item dummy_pivot = { > .e.length = 0 > }; > > -static void z_erofs_commit_extent(struct z_erofs_vle_compress_ctx *ctx, > +static void z_erofs_commit_extent(struct z_erofs_compress_sctx *ctx, > struct z_erofs_extent_item *ei) > { > if (ei == &dummy_pivot) > @@ -212,14 +264,13 @@ static void z_erofs_commit_extent(struct > z_erofs_vle_compress_ctx *ctx, > > list_add_tail(&ei->list, &ctx->extents); > ctx->clusterofs = (ctx->clusterofs + ei->e.length) & > - (erofs_blksiz(ctx->inode->sbi) - 1); > - > + (erofs_blksiz(ctx->ictx->inode->sbi) - 1); > } > > -static int z_erofs_compress_dedupe(struct z_erofs_vle_compress_ctx *ctx, > +static int z_erofs_compress_dedupe(struct z_erofs_compress_sctx *ctx, > unsigned int *len) > { > - struct erofs_inode *inode = ctx->inode; > + struct erofs_inode *inode = ctx->ictx->inode; > const unsigned int lclustermask = (1 << inode->z_logical_clusterbits) > - 1; > struct erofs_sb_info *sbi = inode->sbi; > struct z_erofs_extent_item *ei = ctx->pivot; > @@ -315,16 +366,17 @@ out: > return 0; > } > > -static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx, > +static int write_uncompressed_extent(struct z_erofs_compress_sctx *ctx, > unsigned int len, char *dst) > { > - struct erofs_sb_info *sbi = ctx->inode->sbi; > + struct erofs_inode *inode = ctx->ictx->inode; > + struct erofs_sb_info *sbi = inode->sbi; > unsigned int count = min(erofs_blksiz(sbi), len); > unsigned int interlaced_offset, rightpart; > int ret; > > /* write interlaced uncompressed data if needed */ > - if (ctx->inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER) > + if (inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER) > interlaced_offset = ctx->clusterofs; > else > interlaced_offset = 0; > @@ -335,11 +387,17 @@ static int write_uncompressed_extent(struct > z_erofs_vle_compress_ctx *ctx, > memcpy(dst + interlaced_offset, ctx->queue + ctx->head, rightpart); > memcpy(dst, ctx->queue + ctx->head + rightpart, count - rightpart); > > - erofs_dbg("Writing %u uncompressed data to block %u", > - count, ctx->blkaddr); > - ret = blk_write(sbi, dst, ctx->blkaddr, 1); > - if (ret) > - return ret; > + if (ctx->membuf) { > + erofs_dbg("Writing %u uncompressed data to membuf", count); > + memcpy(ctx->membuf + ctx->memoff, dst, erofs_blksiz(sbi)); > + ctx->memoff += erofs_blksiz(sbi); > + } else { > + erofs_dbg("Writing %u uncompressed data to block %u", count, > + ctx->blkaddr); > + ret = blk_write(sbi, dst, ctx->blkaddr, 1); > + if (ret) > + return ret; > + } > return count; > } > > @@ -379,12 +437,12 @@ static int z_erofs_fill_inline_data(struct erofs_inode > *inode, void *data, > return len; > } > > -static void tryrecompress_trailing(struct z_erofs_vle_compress_ctx *ctx, > +static void tryrecompress_trailing(struct z_erofs_compress_sctx *ctx, > struct erofs_compress *ec, > void *in, unsigned int *insize, > void *out, unsigned int *compressedsize) > { > - struct erofs_sb_info *sbi = ctx->inode->sbi; > + struct erofs_sb_info *sbi = ctx->ictx->inode->sbi; > static char tmp[Z_EROFS_PCLUSTER_MAX_SIZE]; > unsigned int count; > int ret = *compressedsize; > @@ -406,10 +464,11 @@ static void tryrecompress_trailing(struct > z_erofs_vle_compress_ctx *ctx, > *compressedsize = ret; > } > > -static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx > *ctx, > +static bool z_erofs_fixup_deduped_fragment(struct z_erofs_compress_sctx *ctx, > unsigned int len) > { > - struct erofs_inode *inode = ctx->inode; > + struct z_erofs_compress_ictx *ictx = ctx->ictx; > + struct erofs_inode *inode = ictx->inode; > struct erofs_sb_info *sbi = inode->sbi; > const unsigned int newsize = ctx->remaining + len; > > @@ -417,9 +476,10 @@ static bool z_erofs_fixup_deduped_fragment(struct > z_erofs_vle_compress_ctx *ctx, > > /* try to fix again if it gets larger (should be rare) */ > if (inode->fragment_size < newsize) { > - ctx->pclustersize = min_t(erofs_off_t, > z_erofs_get_max_pclustersize(inode), > - roundup(newsize - > inode->fragment_size, > - erofs_blksiz(sbi))); > + ictx->pclustersize = min_t(erofs_off_t, > + z_erofs_get_max_pclustersize(inode), > + roundup(newsize - inode->fragment_size, > + erofs_blksiz(sbi))); > return false; > } > > @@ -436,29 +496,32 @@ static bool z_erofs_fixup_deduped_fragment(struct > z_erofs_vle_compress_ctx *ctx, > return true; > } > > -static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx, > +static int __z_erofs_compress_one(struct z_erofs_compress_sctx *ctx, > struct z_erofs_inmem_extent *e) > { > - static char dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE]; > - struct erofs_inode *inode = ctx->inode; > + static char g_dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + > EROFS_MAX_BLOCK_SIZE]; > + char *dstbuf = ctx->destbuf ?: g_dstbuf; > + struct z_erofs_compress_ictx *ictx = ctx->ictx; > + struct erofs_inode *inode = ictx->inode; > struct erofs_sb_info *sbi = inode->sbi; > unsigned int blksz = erofs_blksiz(sbi); > char *const dst = dstbuf + blksz; > - struct erofs_compress *const h = &ctx->ccfg->handle; > + struct erofs_compress *const h = ctx->chandle; > unsigned int len = ctx->tail - ctx->head; > bool is_packed_inode = erofs_is_packed_inode(inode); > bool final = !ctx->remaining; > - bool may_packing = (cfg.c_fragments && final && !is_packed_inode); > + bool may_packing = (cfg.c_fragments && final && !is_packed_inode && > + !z_erofs_mt_enabled); > bool may_inline = (cfg.c_ztailpacking && final && !may_packing); > unsigned int compressedsize; > int ret; > > - if (len <= ctx->pclustersize) { > + if (len <= ictx->pclustersize) { > if (!final || !len) > return 1; > if (may_packing) { > - if (inode->fragment_size && !ctx->fix_dedupedfrag) { > - ctx->pclustersize = roundup(len, blksz); > + if (inode->fragment_size && !ictx->fix_dedupedfrag) { > + ictx->pclustersize = roundup(len, blksz); > goto fix_dedupedfrag; > } > e->length = len; > @@ -470,7 +533,7 @@ static int __z_erofs_compress_one(struct > z_erofs_vle_compress_ctx *ctx, > > e->length = min(len, cfg.c_max_decompressed_extent_bytes); > ret = erofs_compress_destsize(h, ctx->queue + ctx->head, > - &e->length, dst, ctx->pclustersize); > + &e->length, dst, ictx->pclustersize); > if (ret <= 0) { > erofs_err("failed to compress %s: %s", inode->i_srcpath, > erofs_strerror(ret)); > @@ -507,16 +570,16 @@ nocompression: > e->compressedblks = 1; > e->raw = true; > } else if (may_packing && len == e->length && > - compressedsize < ctx->pclustersize && > - (!inode->fragment_size || ctx->fix_dedupedfrag)) { > + compressedsize < ictx->pclustersize && > + (!inode->fragment_size || ictx->fix_dedupedfrag)) { > frag_packing: > ret = z_erofs_pack_fragments(inode, ctx->queue + ctx->head, > - len, ctx->tof_chksum); > + len, ictx->tof_chksum); > if (ret < 0) > return ret; > e->compressedblks = 0; /* indicate a fragment */ > e->raw = false; > - ctx->fragemitted = true; > + ictx->fragemitted = true; > /* tailpcluster should be less than 1 block */ > } else if (may_inline && len == e->length && compressedsize < blksz) { > if (ctx->clusterofs + len <= blksz) { > @@ -545,8 +608,8 @@ frag_packing: > */ > if (may_packing && len == e->length && > (compressedsize & (blksz - 1)) && > - ctx->tail < sizeof(ctx->queue)) { > - ctx->pclustersize = roundup(compressedsize, blksz); > + ctx->tail < Z_EROFS_COMPR_QUEUE_SZ) { > + ictx->pclustersize = roundup(compressedsize, blksz); > goto fix_dedupedfrag; > } > > @@ -569,34 +632,45 @@ frag_packing: > } > > /* write compressed data */ > - erofs_dbg("Writing %u compressed data to %u of %u blocks", > - e->length, ctx->blkaddr, e->compressedblks); > + if (ctx->membuf) { > + erofs_off_t sz = e->compressedblks * blksz; > + erofs_dbg("Writing %u compressed data to membuf of %u > blocks", > + e->length, e->compressedblks); > > - ret = blk_write(sbi, dst - padding, ctx->blkaddr, > - e->compressedblks); > - if (ret) > - return ret; > + memcpy(ctx->membuf + ctx->memoff, dst - padding, sz); > + ctx->memoff += sz; > + } else { > + erofs_dbg("Writing %u compressed data to %u of %u > blocks", > + e->length, ctx->blkaddr, e->compressedblks); > + > + ret = blk_write(sbi, dst - padding, ctx->blkaddr, > + e->compressedblks); > + if (ret) > + return ret; > + } > e->raw = false; > may_inline = false; > may_packing = false; > } > e->partial = false; > e->blkaddr = ctx->blkaddr; > + if (ctx->blkaddr != EROFS_NULL_ADDR) > + ctx->blkaddr += e->compressedblks; > if (!may_inline && !may_packing && !is_packed_inode) > (void)z_erofs_dedupe_insert(e, ctx->queue + ctx->head); > - ctx->blkaddr += e->compressedblks; > ctx->head += e->length; > return 0; > > fix_dedupedfrag: > DBG_BUGON(!inode->fragment_size); > ctx->remaining += inode->fragment_size; > - ctx->fix_dedupedfrag = true; > + ictx->fix_dedupedfrag = true; > return 1; > } > > -static int z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx) > +static int z_erofs_compress_one(struct z_erofs_compress_sctx *ctx) > { > + struct z_erofs_compress_ictx *ictx = ctx->ictx; > unsigned int len = ctx->tail - ctx->head; > struct z_erofs_extent_item *ei; > > @@ -624,7 +698,7 @@ static int z_erofs_compress_one(struct > z_erofs_vle_compress_ctx *ctx) > > len -= ei->e.length; > ctx->pivot = ei; > - if (ctx->fix_dedupedfrag && !ctx->fragemitted && > + if (ictx->fix_dedupedfrag && !ictx->fragemitted && > z_erofs_fixup_deduped_fragment(ctx, len)) > break; > > @@ -912,13 +986,268 @@ void z_erofs_drop_inline_pcluster(struct erofs_inode > *inode) > inode->eof_tailraw = NULL; > } > > +int z_erofs_compress_segment(struct z_erofs_compress_sctx *ctx, > + u64 offset, erofs_blk_t blkaddr) > +{ > + int fd = ctx->ictx->fd; > + > + ctx->blkaddr = blkaddr; > + while (ctx->remaining) { > + const u64 rx = min_t(u64, ctx->remaining, > + Z_EROFS_COMPR_QUEUE_SZ - ctx->tail); > + int ret; > + > + ret = (offset == -1 ? > + read(fd, ctx->queue + ctx->tail, rx) : > + pread(fd, ctx->queue + ctx->tail, rx, offset)); > + if (ret != rx) > + return -errno; > + > + ctx->remaining -= rx; > + ctx->tail += rx; > + if (offset != -1) > + offset += rx; > + > + ret = z_erofs_compress_one(ctx); > + if (ret) > + return ret; > + } > + DBG_BUGON(ctx->head != ctx->tail); > + > + if (ctx->pivot) { > + z_erofs_commit_extent(ctx, ctx->pivot); > + ctx->pivot = NULL; > + } > + return 0; > +} > + > +#ifdef EROFS_MT_ENABLED > +void *z_erofs_mt_wq_tls_alloc(struct erofs_workqueue *wq, void *ptr) > +{ > + struct erofs_compress_wq_tls *tls; > + > + tls = calloc(1, sizeof(*tls)); > + if (!tls) > + return NULL; > + > + tls->queue = malloc(Z_EROFS_COMPR_QUEUE_SZ); > + if (!tls->queue) > + goto err_free_priv; > + > + tls->destbuf = calloc(1, EROFS_CONFIG_COMPR_MAX_SZ + > + EROFS_MAX_BLOCK_SIZE); > + if (!tls->destbuf) > + goto err_free_queue; > + > + tls->ccfg = calloc(EROFS_MAX_COMPR_CFGS, sizeof(*tls->ccfg)); > + if (!tls->ccfg) > + goto err_free_destbuf; > + return tls; > + > +err_free_destbuf: > + free(tls->destbuf); > +err_free_queue: > + free(tls->queue); > +err_free_priv: > + free(tls); > + return NULL; > +} > + > +int z_erofs_mt_wq_tls_init_compr(struct erofs_sb_info *sbi, > + struct erofs_compress_wq_tls *tls, > + unsigned int alg_id, char *alg_name, > + unsigned int comp_level, > + unsigned int dict_size) > +{ > + struct erofs_compress_cfg *lc = &tls->ccfg[alg_id]; > + int ret; > + > + if (likely(lc->enable)) > + return 0; > + > + ret = erofs_compressor_init(sbi, &lc->handle, alg_name, > + comp_level, dict_size); > + if (ret) > + return ret; > + lc->algorithmtype = alg_id; > + lc->enable = true; > + return 0; > +} > + > +void *z_erofs_mt_wq_tls_free(struct erofs_workqueue *wq, void *priv) > +{ > + struct erofs_compress_wq_tls *tls = priv; > + int i; > + > + for (i = 0; i < EROFS_MAX_COMPR_CFGS; i++) > + if (tls->ccfg[i].enable) > + erofs_compressor_exit(&tls->ccfg[i].handle); > + > + free(tls->ccfg); > + free(tls->destbuf); > + free(tls->queue); > + free(tls); > + return NULL; > +} > + > +void z_erofs_mt_workfn(struct erofs_work *work, void *tlsp) > +{ > + struct erofs_compress_work *cwork = (struct erofs_compress_work > *)work; > + struct erofs_compress_wq_tls *tls = tlsp; > + struct z_erofs_compress_sctx *ctx = &cwork->ctx; > + u64 offset = ctx->seg_idx * cfg.c_segment_size; > + int ret = 0; > + > + ret = z_erofs_mt_wq_tls_init_compr(ctx->ictx->inode->sbi, tls, > + cwork->alg_id, cwork->alg_name, > + cwork->comp_level, > + cwork->dict_size); > + if (ret) > + goto out; > + > + ctx->queue = tls->queue; > + ctx->destbuf = tls->destbuf; > + ctx->chandle = &tls->ccfg[cwork->alg_id].handle; > + > + ctx->membuf = malloc(ctx->remaining); > + if (!ctx->membuf) { > + ret = -ENOMEM; > + goto out; > + } > + ctx->memoff = 0; > + > + ret = z_erofs_compress_segment(ctx, offset, EROFS_NULL_ADDR); > + > +out: > + cwork->errcode = ret; > + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex); > + ++z_erofs_mt_ctrl.nfini; > + pthread_cond_signal(&z_erofs_mt_ctrl.cond); > + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex); > +} > + > +int z_erofs_merge_segment(struct z_erofs_compress_ictx *ictx, > + struct z_erofs_compress_sctx *ctx) > +{ > + struct z_erofs_extent_item *ei, *n; > + struct erofs_sb_info *sbi = ictx->inode->sbi; > + erofs_blk_t blkoff = 0; > + int ret = 0, ret2; > + > + list_for_each_entry_safe(ei, n, &ctx->extents, list) { > + list_del(&ei->list); > + list_add_tail(&ei->list, &ictx->extents); > + > + if (ei->e.blkaddr != EROFS_NULL_ADDR) /* deduped extents */ > + continue; > + > + ei->e.blkaddr = ctx->blkaddr; > + ctx->blkaddr += ei->e.compressedblks; > + > + ret2 = blk_write(sbi, ctx->membuf + blkoff * > erofs_blksiz(sbi), > + ei->e.blkaddr, ei->e.compressedblks); > + blkoff += ei->e.compressedblks; > + if (ret2) { > + ret = ret2; > + continue; > + } > + } > + free(ctx->membuf); > + return ret; > +} > + > +int z_erofs_mt_compress(struct z_erofs_compress_ictx *ctx, > + struct erofs_compress_cfg *ccfg, > + erofs_blk_t blkaddr, > + erofs_blk_t *compressed_blocks) > +{ > + struct erofs_compress_work *cur, *head = NULL, **last = &head; > + struct erofs_inode *inode = ctx->inode; > + int nsegs = DIV_ROUND_UP(inode->i_size, cfg.c_segment_size); > + int ret, i; > + > + z_erofs_mt_ctrl.nfini = 0; > + > + for (i = 0; i < nsegs; i++) { > + if (z_erofs_mt_ctrl.idle) { > + cur = z_erofs_mt_ctrl.idle; > + z_erofs_mt_ctrl.idle = cur->next; > + cur->next = NULL; > + } else { > + cur = calloc(1, sizeof(*cur)); > + if (!cur) > + return -ENOMEM; > + } > + *last = cur; > + last = &cur->next; > + > + cur->ctx = (struct z_erofs_compress_sctx) { > + .ictx = ctx, > + .seg_num = nsegs, > + .seg_idx = i, > + .pivot = &dummy_pivot, > + }; > + init_list_head(&cur->ctx.extents); > + > + if (i == nsegs - 1) > + cur->ctx.remaining = inode->i_size - > + inode->fragment_size - > + i * cfg.c_segment_size; > + else > + cur->ctx.remaining = cfg.c_segment_size; > + > + cur->alg_id = ccfg->handle.alg->id; > + cur->alg_name = ccfg->handle.alg->name; > + cur->comp_level = ccfg->handle.compression_level; > + cur->dict_size = ccfg->handle.dict_size; > + > + cur->work.fn = z_erofs_mt_workfn; > + erofs_queue_work(&z_erofs_mt_ctrl.wq, &cur->work); > + } > + > + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex); > + while (z_erofs_mt_ctrl.nfini != nsegs) > + pthread_cond_wait(&z_erofs_mt_ctrl.cond, > + &z_erofs_mt_ctrl.mutex); > + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex); > + > + ret = 0; > + while (head) { > + cur = head; > + head = cur->next; > + > + if (cur->errcode) { > + ret = cur->errcode; > + } else { > + int ret2; > + > + cur->ctx.blkaddr = blkaddr; > + ret2 = z_erofs_merge_segment(ctx, &cur->ctx); > + if (ret2) > + ret = ret2; > + > + *compressed_blocks += cur->ctx.blkaddr - blkaddr; > + blkaddr = cur->ctx.blkaddr; > + } > + > + cur->next = z_erofs_mt_ctrl.idle; > + z_erofs_mt_ctrl.idle = cur; > + } > + return ret; > +} > +#endif > + > int erofs_write_compressed_file(struct erofs_inode *inode, int fd) > { > + static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ]; > struct erofs_buffer_head *bh; > - static struct z_erofs_vle_compress_ctx ctx; > - erofs_blk_t blkaddr, compressed_blocks; > + static struct z_erofs_compress_ictx ctx; > + static struct z_erofs_compress_sctx sctx; > + struct erofs_compress_cfg *ccfg; > + erofs_blk_t blkaddr, compressed_blocks = 0; > unsigned int legacymetasize; > int ret; > + bool ismt = false; > struct erofs_sb_info *sbi = inode->sbi; > u8 *compressmeta = malloc(BLK_ROUND_UP(sbi, inode->i_size) * > sizeof(struct z_erofs_lcluster_index) + > @@ -963,8 +1292,8 @@ int erofs_write_compressed_file(struct erofs_inode > *inode, int fd) > } > } > #endif > - ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]]; > - inode->z_algorithmtype[0] = ctx.ccfg[0].algorithmtype; > + ccfg = &erofs_ccfg[inode->z_algorithmtype[0]]; > + inode->z_algorithmtype[0] = ccfg[0].algorithmtype; > inode->z_algorithmtype[1] = 0; > > inode->idata_size = 0; > @@ -983,50 +1312,45 @@ int erofs_write_compressed_file(struct erofs_inode > *inode, int fd) > blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */ > ctx.inode = inode; > ctx.pclustersize = z_erofs_get_max_pclustersize(inode); > - ctx.blkaddr = blkaddr; > ctx.metacur = compressmeta + Z_EROFS_LEGACY_MAP_HEADER_SIZE; > - ctx.head = ctx.tail = 0; > - ctx.clusterofs = 0; > - ctx.pivot = &dummy_pivot; > init_list_head(&ctx.extents); > - ctx.remaining = inode->i_size - inode->fragment_size; > + ctx.fd = fd; > ctx.fix_dedupedfrag = false; > ctx.fragemitted = false; > + sctx = (struct z_erofs_compress_sctx) { .ictx = &ctx, }; > + init_list_head(&sctx.extents); > + > if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) && > !inode->fragment_size) { > ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum); > if (ret) > goto err_free_idata; > +#ifdef EROFS_MT_ENABLED > + } else if (z_erofs_mt_enabled && inode->i_size > cfg.c_segment_size) { > + ismt = true; > + ret = z_erofs_mt_compress(&ctx, ccfg, blkaddr, > &compressed_blocks); > + if (ret) > + goto err_free_idata; > +#endif > } else { > - while (ctx.remaining) { > - const u64 rx = min_t(u64, ctx.remaining, > - sizeof(ctx.queue) - ctx.tail); > - > - ret = read(fd, ctx.queue + ctx.tail, rx); > - if (ret != rx) { > - ret = -errno; > - goto err_bdrop; > - } > - ctx.remaining -= rx; > - ctx.tail += rx; > - > - ret = z_erofs_compress_one(&ctx); > - if (ret) > - goto err_free_idata; > - } > + sctx.queue = g_queue; > + sctx.destbuf = NULL; > + sctx.chandle = &ccfg->handle; > + sctx.remaining = inode->i_size - inode->fragment_size; > + sctx.seg_num = 1; > + sctx.seg_idx = 0; > + sctx.pivot = &dummy_pivot; > + > + ret = z_erofs_compress_segment(&sctx, -1, blkaddr); > + if (ret) > + goto err_free_idata; > + compressed_blocks = sctx.blkaddr - blkaddr; > } > - DBG_BUGON(ctx.head != ctx.tail); > > /* fall back to no compression mode */ > - compressed_blocks = ctx.blkaddr - blkaddr; > DBG_BUGON(compressed_blocks < !!inode->idata_size); > compressed_blocks -= !!inode->idata_size; > > - if (ctx.pivot) { > - z_erofs_commit_extent(&ctx, ctx.pivot); > - ctx.pivot = NULL; > - } > - > /* generate an extent for the deduplicated fragment */ > if (inode->fragment_size && !ctx.fragemitted) { > struct z_erofs_extent_item *ei; > @@ -1042,13 +1366,16 @@ int erofs_write_compressed_file(struct erofs_inode > *inode, int fd) > .compressedblks = 0, > .raw = false, > .partial = false, > - .blkaddr = ctx.blkaddr, > + .blkaddr = sctx.blkaddr, > }; > init_list_head(&ei->list); > - z_erofs_commit_extent(&ctx, ei); > + z_erofs_commit_extent(&sctx, ei); > } > z_erofs_fragments_commit(inode); > > + if (!ismt) > + list_splice_tail(&sctx.extents, &ctx.extents); > + > z_erofs_write_indexes(&ctx); > legacymetasize = ctx.metacur - compressmeta; > /* estimate if data compression saves space or not */ > @@ -1257,8 +1584,25 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, > struct erofs_buffer_head *s > return -EINVAL; > } > > - if (erofs_sb_has_compr_cfgs(sbi)) > - return z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size); > + if (erofs_sb_has_compr_cfgs(sbi)) { > + ret = z_erofs_build_compr_cfgs(sbi, sb_bh, max_dict_size); > + if (ret) > + return ret; > + } > + > + z_erofs_mt_enabled = false; > +#ifdef EROFS_MT_ENABLED > + if (cfg.c_mt_workers > 1) { > + pthread_mutex_init(&z_erofs_mt_ctrl.mutex, NULL); > + pthread_cond_init(&z_erofs_mt_ctrl.cond, NULL); > + ret = erofs_alloc_workqueue(&z_erofs_mt_ctrl.wq, > + cfg.c_mt_workers, > + cfg.c_mt_workers << 2, > + z_erofs_mt_wq_tls_alloc, > + z_erofs_mt_wq_tls_free); > + z_erofs_mt_enabled = !ret; > + } > +#endif > return 0; > } > > @@ -1271,5 +1615,19 @@ int z_erofs_compress_exit(void) > if (ret) > return ret; > } > + > + if (z_erofs_mt_enabled) { > +#ifdef EROFS_MT_ENABLED > + ret = erofs_destroy_workqueue(&z_erofs_mt_ctrl.wq); > + if (ret) > + return ret; > + while (z_erofs_mt_ctrl.idle) { > + struct erofs_compress_work *tmp = > + z_erofs_mt_ctrl.idle->next; > + free(z_erofs_mt_ctrl.idle); > + z_erofs_mt_ctrl.idle = tmp; > + } > +#endif > + } > return 0; > } > diff --git a/lib/compressor.c b/lib/compressor.c > index 58eae2a..175259e 100644 > --- a/lib/compressor.c > +++ b/lib/compressor.c > @@ -86,6 +86,8 @@ int erofs_compressor_init(struct erofs_sb_info *sbi, struct > erofs_compress *c, > > /* should be written in "minimum compression ratio * 100" */ > c->compress_threshold = 100; > + c->compression_level = -1; > + c->dict_size = 0; > > if (!alg_name) { > c->alg = NULL; > diff --git a/mkfs/main.c b/mkfs/main.c > index 126a049..5dbaf9f 100644 > --- a/mkfs/main.c > +++ b/mkfs/main.c > @@ -678,7 +678,7 @@ static int mkfs_parse_options_cfg(int argc, char *argv[]) > > processors = erofs_get_available_processors(); > if (cfg.c_mt_workers > processors) > - erofs_warn("the number of workers %d is more > than the number of processors %d, performance may be impacted.", > + erofs_warn("%d workers exceed %d processors, > potentially impacting performance.", > cfg.c_mt_workers, processors); > break; > } > @@ -838,6 +838,12 @@ static int mkfs_parse_options_cfg(int argc, char *argv[]) > } > cfg.c_pclusterblks_packed = pclustersize_packed >> > sbi.blkszbits; > } > +#ifdef EROFS_MT_ENABLED > + if (cfg.c_mt_workers > 1 && (cfg.c_dedupe || cfg.c_fragments)) { > + erofs_warn("Note that dedupe/fragments are NOT supported in > multi-threaded mode for now, reseting --workers=1."); > + cfg.c_mt_workers = 1; > + } > +#endif > return 0; > } > > -- > 2.39.3 >