From: Yifan Zhao <zhaoyi...@sjtu.edu.cn> This patch separates the compression process into two parts.
Specifically, erofs_begin_compressed_file() will trigger compression. erofs_write_compressed_file() will wait for the compression finish and write compressed (meta)data. Note that it's possible that erofs_begin_compressed_file() and erofs_write_compressed_file() run with different threads even the global inode context is used, thus add another synchronization point. Signed-off-by: Yifan Zhao <zhaoyi...@sjtu.edu.cn> Co-authored-by: Tong Xin <xin_t...@sjtu.edu.cn> Signed-off-by: Gao Xiang <hsiang...@linux.alibaba.com> --- include/erofs/compress.h | 5 +- lib/compress.c | 138 ++++++++++++++++++++++++++++----------- lib/inode.c | 17 ++++- 3 files changed, 118 insertions(+), 42 deletions(-) diff --git a/include/erofs/compress.h b/include/erofs/compress.h index 871db54..c9831a7 100644 --- a/include/erofs/compress.h +++ b/include/erofs/compress.h @@ -17,8 +17,11 @@ extern "C" #define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024) #define Z_EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2) +struct z_erofs_compress_ictx; + void z_erofs_drop_inline_pcluster(struct erofs_inode *inode); -int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos); +void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos); +int erofs_write_compressed_file(struct z_erofs_compress_ictx *ictx); int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *bh); diff --git a/lib/compress.c b/lib/compress.c index 4ac4760..7fef698 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -109,6 +109,7 @@ struct erofs_compress_work { static struct { struct erofs_workqueue wq; struct erofs_compress_work *idle; + pthread_mutex_t mutex; } z_erofs_mt_ctrl; #endif @@ -1312,11 +1313,13 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx) pthread_cond_init(&ictx->cond, NULL); for (i = 0; i < nsegs; i++) { + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex); cur = z_erofs_mt_ctrl.idle; if (cur) { z_erofs_mt_ctrl.idle = cur->next; cur->next = NULL; } + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex); if (!cur) { cur = calloc(1, sizeof(*cur)); if (!cur) @@ -1364,8 +1367,10 @@ int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx) pthread_mutex_unlock(&ictx->mutex); bh = erofs_balloc(DATA, 0, 0, 0); - if (IS_ERR(bh)) - return PTR_ERR(bh); + if (IS_ERR(bh)) { + ret = PTR_ERR(bh); + goto out; + } DBG_BUGON(!head); blkaddr = erofs_mapbh(bh->block); @@ -1389,27 +1394,31 @@ int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx) blkaddr = cur->ctx.blkaddr; } + pthread_mutex_lock(&z_erofs_mt_ctrl.mutex); cur->next = z_erofs_mt_ctrl.idle; z_erofs_mt_ctrl.idle = cur; - } while(head); + pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex); + } while (head); if (ret) - return ret; - - return erofs_commit_compressed_file(ictx, bh, + goto out; + ret = erofs_commit_compressed_file(ictx, bh, blkaddr - compressed_blocks, compressed_blocks); + +out: + close(ictx->fd); + free(ictx); + return ret; } #endif -int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos) +static struct z_erofs_compress_ictx g_ictx; + +void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos) { - static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ]; - struct erofs_buffer_head *bh; - static struct z_erofs_compress_ictx ctx; - static struct z_erofs_compress_sctx sctx; - erofs_blk_t blkaddr; - int ret; struct erofs_sb_info *sbi = inode->sbi; + struct z_erofs_compress_ictx *ictx; + int ret; /* initialize per-file compression setting */ inode->z_advise = 0; @@ -1440,43 +1449,87 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos) } } #endif - ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]]; - inode->z_algorithmtype[0] = ctx.ccfg->algorithmtype; - inode->z_algorithmtype[1] = 0; - inode->idata_size = 0; inode->fragment_size = 0; + if (z_erofs_mt_enabled) { + ictx = malloc(sizeof(*ictx)); + if (!ictx) + return ERR_PTR(-ENOMEM); + ictx->fd = dup(fd); + } else { +#ifdef EROFS_MT_ENABLED + pthread_mutex_lock(&g_ictx.mutex); + if (g_ictx.seg_num) + pthread_cond_wait(&g_ictx.cond, &g_ictx.mutex); + g_ictx.seg_num = 1; + pthread_mutex_unlock(&g_ictx.mutex); +#endif + ictx = &g_ictx; + ictx->fd = fd; + } + + ictx->ccfg = &erofs_ccfg[inode->z_algorithmtype[0]]; + inode->z_algorithmtype[0] = ictx->ccfg->algorithmtype; + inode->z_algorithmtype[1] = 0; + /* * Handle tails in advance to avoid writing duplicated * parts into the packed inode. */ if (cfg.c_fragments && !erofs_is_packed_inode(inode)) { - ret = z_erofs_fragments_dedupe(inode, fd, &ctx.tof_chksum); + ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum); if (ret < 0) - return ret; + goto err_free_ictx; } - ctx.inode = inode; - ctx.fd = fd; - ctx.fpos = fpos; - init_list_head(&ctx.extents); - ctx.fix_dedupedfrag = false; - ctx.fragemitted = false; + ictx->inode = inode; + ictx->fpos = fpos; + init_list_head(&ictx->extents); + ictx->fix_dedupedfrag = false; + ictx->fragemitted = false; if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) && !inode->fragment_size) { - ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum); + ret = z_erofs_pack_file_from_fd(inode, fd, ictx->tof_chksum); if (ret) goto err_free_idata; + } #ifdef EROFS_MT_ENABLED - } else if (z_erofs_mt_enabled) { - ret = z_erofs_mt_compress(&ctx); + if (ictx != &g_ictx) { + ret = z_erofs_mt_compress(ictx); if (ret) goto err_free_idata; - return erofs_mt_write_compressed_file(&ctx); + } #endif + return ictx; + +err_free_idata: + if (inode->idata) { + free(inode->idata); + inode->idata = NULL; } +err_free_ictx: + if (ictx != &g_ictx) + free(ictx); + return ERR_PTR(ret); +} + +int erofs_write_compressed_file(struct z_erofs_compress_ictx *ictx) +{ + static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ]; + struct erofs_buffer_head *bh; + static struct z_erofs_compress_sctx sctx; + struct erofs_compress_cfg *ccfg = ictx->ccfg; + struct erofs_inode *inode = ictx->inode; + erofs_blk_t blkaddr; + int ret; + +#ifdef EROFS_MT_ENABLED + if (ictx != &g_ictx) + return erofs_mt_write_compressed_file(ictx); +#endif + /* allocate main data buffer */ bh = erofs_balloc(DATA, 0, 0, 0); if (IS_ERR(bh)) { @@ -1485,11 +1538,11 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos) } blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */ - ctx.seg_num = 1; + ictx->seg_num = 1; sctx = (struct z_erofs_compress_sctx) { - .ictx = &ctx, + .ictx = ictx, .queue = g_queue, - .chandle = &ctx.ccfg->handle, + .chandle = &ccfg->handle, .remaining = inode->i_size - inode->fragment_size, .seg_idx = 0, .pivot = &dummy_pivot, @@ -1499,19 +1552,26 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos) ret = z_erofs_compress_segment(&sctx, -1, blkaddr); if (ret) - goto err_bdrop; - list_splice_tail(&sctx.extents, &ctx.extents); + goto err_free_idata; - return erofs_commit_compressed_file(&ctx, bh, blkaddr, - sctx.blkaddr - blkaddr); + list_splice_tail(&sctx.extents, &ictx->extents); + ret = erofs_commit_compressed_file(ictx, bh, blkaddr, + sctx.blkaddr - blkaddr); + goto out; -err_bdrop: - erofs_bdrop(bh, true); /* revoke buffer */ err_free_idata: + erofs_bdrop(bh, true); /* revoke buffer */ if (inode->idata) { free(inode->idata); inode->idata = NULL; } +out: +#ifdef EROFS_MT_ENABLED + pthread_mutex_lock(&ictx->mutex); + ictx->seg_num = 0; + pthread_cond_signal(&ictx->cond); + pthread_mutex_unlock(&ictx->mutex); +#endif return ret; } @@ -1666,6 +1726,8 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s z_erofs_mt_wq_tls_free); z_erofs_mt_enabled = !ret; } + pthread_mutex_init(&g_ictx.mutex, NULL); + pthread_cond_init(&g_ictx.cond, NULL); #endif return 0; } diff --git a/lib/inode.c b/lib/inode.c index 1ff05e1..0d044f4 100644 --- a/lib/inode.c +++ b/lib/inode.c @@ -499,10 +499,15 @@ int erofs_write_file(struct erofs_inode *inode, int fd, u64 fpos) DBG_BUGON(!inode->i_size); if (cfg.c_compr_opts[0].alg && erofs_file_is_compressible(inode)) { + void *ictx; int ret; - ret = erofs_write_compressed_file(inode, fd, fpos); - if (!ret || ret != -ENOSPC) + ictx = erofs_begin_compressed_file(inode, fd, fpos); + if (IS_ERR(ictx)) + return PTR_ERR(ictx); + + ret = erofs_write_compressed_file(ictx); + if (ret != -ENOSPC) return ret; if (lseek(fd, fpos, SEEK_SET) < 0) @@ -1362,6 +1367,7 @@ struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name) { struct stat st; struct erofs_inode *inode; + void *ictx; int ret; ret = lseek(fd, 0, SEEK_SET); @@ -1392,7 +1398,12 @@ struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name) inode->nid = inode->sbi->packed_nid; } - ret = erofs_write_compressed_file(inode, fd, 0); + ictx = erofs_begin_compressed_file(inode, fd, 0); + if (IS_ERR(ictx)) + return ERR_CAST(ictx); + + DBG_BUGON(!ictx); + ret = erofs_write_compressed_file(ictx); if (ret == -ENOSPC) { ret = lseek(fd, 0, SEEK_SET); if (ret < 0) -- 2.30.2