Here are the cases where we call the `newWorkerPool` in our code: - Correctly: - S3FileIO.executorService - HadoopFileIO.executorService - Incorrectly: - CountersBenchmark.defaultCounterMultipleThreads (core module) - BaseDistributedDataScan.newMonitorPool (core module) - FlinkInputFormat.createInputSplits (flink module) - IcebergInputFormat.getSplits (flink module) - Incorrectly, but typically called only once in the JVM lifecycle - TableMigrationUtil.migrationService - the pool management is abandoned, and nothing prevents multiple pool creations (data module) - IcebergCommitter<init> (flink module) IcebergFilesCommitter.open (flink module) - IcebergSource.planSplitsForBatch (flink module) - StreamingMonitorFunction.open (flink module) - ContinuousSplitPlannerImpl<init> (flink module) - Coordinator<init> - Kafka coordinator - I'm not sure that this belongs to here (kafka-connect)
The code we need to duplicate in core/data/flink/kafka module is: * public static ExecutorService newNonExitingWorkerPool(String namePrefix, int poolSize) {* * return Executors.newFixedThreadPool(* * poolSize,* * new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + "-%d").build());* * }* Maybe adding another utility method to the `ThreadPools` would help future contributors to think twice about the need for using the `Exiting` solution, so I would prefer to add this method to the core `ThreadPools` with enough javadoc to highlight the intended usage. Thanks, Peter rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept. 18., Sze, 23:26): > I think this is the intended behavior. The code calls > `MoreExecutors.getExitingExecutorService` internally to ensure the pool > exits. I think the right fix is for callers to create their own > `ExecutorService` rather than using `newWorkerPool`. That allows for > customization without making Iceberg more complicated. `ThreadPools` isn't > doing anything special here. It's just a convenience method to create an > exiting, fixed-size thread pool that runs daemon threads. If that's not > what you're looking for then isn't it reasonable to make your own > convenience method? > > On Wed, Sep 18, 2024 at 1:22 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> This is not just a Flink issue, tha calls are spread out in multiple >> packages. We checked the code, and in many of the current use-cases in the >> Iceberg repo the pool is not used in a static environment, and closed >> manually. In this cases we should switch to a thread pool without a >> shutdown hook. So I think minimally we need to create a utility method to >> create such a pool. >> >> The main question is: >> - Is it a bug, or a feature, that we always provide a pool with a hook? >> >> If this is a bug, then we create a "newExitingWorkerPool", and change the >> callers to use the correct one. >> If this is a feature, then we create a "newNotExitingWorkerPool" (which >> is gross IMHO, but we should consider API compatibility), and change the >> callers to use the correct one. >> >> Thanks, >> Peter >> >> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com> wrote: >> >>> Since we're using standard interfaces, maybe we should just document >>> this behavior and you can control it by creating your own worker pool >>> instead? >>> >>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Bumping this thread a bit. >>>> >>>> Cleaning up the pool in non-static cases should be a responsibility of >>>> the user. If they want a pool which is closed by a hook when the JVM exists >>>> they should explicitly "say" so, for example calling >>>> "newExitingWorkerPool". >>>> >>>> This is a behaviour change in the API, so I think we need feedback from >>>> the community before proceeding with it. >>>> What are your thoughts? >>>> >>>> Thanks, >>>> Peter >>>> >>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13., P, >>>> 17:16): >>>> >>>>> Hi all, >>>>> >>>>> During the investigation of a metaspace memory leak issue in Flink >>>>> IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a >>>>> discussion with @pvary revealed that *ThreadPools.newWorkerPool* >>>>> currently registers a Shutdown Hook via ExitingExecutorService for all >>>>> created thread pools. While this ensures graceful shutdown of the pools >>>>> when the JVM exits, it might lead to unnecessary Shutdown Hook >>>>> accumulation, especially when the pool is explicitly closed within the >>>>> application's lifecycle. >>>>> >>>>> I propose to *modify ThreadPools.newWorkerPool to not register a >>>>> Shutdown Hook by default*. This would prevent potential issues where >>>>> developers might unintentionally register numerous Shutdown Hooks when >>>>> using ThreadPools.newWorkerPool for short-lived thread pools. >>>>> To retain the existing functionality for long-lived thread pools that >>>>> require a Shutdown Hook, I suggest introducing a new, more descriptive >>>>> function, such as *newExitingWorkerPool*. This function would >>>>> explicitly create thread pools that are registered with a Shutdown Hook. >>>>> >>>>> *This change might potentially impact users who rely on the implicit >>>>> Shutdown Hook registration provided by the current >>>>> ThreadPools.newWorkerPool implementation.* >>>>> I would like to gather feedback from the community regarding this >>>>> proposed change, especially regarding potential compatibility concerns. >>>>> >>>>> Best regards, >>>>> Feng Jiajie >>>>> >>>>>