First, we should definitely add Javadoc to `ThreadPools.newWorkerPool` on its behavior with a shutdown hook. It is not obvious from the method name. I would actually go further to deprecate `newWorkerPool` with `newExitingWorkerPool`. `newWorkerPool` method name is easy to cause the misuage, as the intention is not obvious from the name.
`*newNonExitingWorkerPool*` is a little awkward to me. `NonExiting` should be the default behavior. Maybe we can call this new method as `newFixedThreadPool(int poolSize, String prefix)`. Alternatively, we can just make `ThreadPools.newDaemonThreadFactory` public as the proposed ` *newNonExitingWorkerPool` *really just saved one line on the thread factory construction. On Wed, Sep 18, 2024 at 10:25 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Here are the cases where we call the `newWorkerPool` in our code: > > - Correctly: > - S3FileIO.executorService > - HadoopFileIO.executorService > - Incorrectly: > - CountersBenchmark.defaultCounterMultipleThreads (core module) > - BaseDistributedDataScan.newMonitorPool (core module) > - FlinkInputFormat.createInputSplits (flink module) > - IcebergInputFormat.getSplits (flink module) > - Incorrectly, but typically called only once in the JVM lifecycle > - TableMigrationUtil.migrationService - the pool management is > abandoned, and nothing prevents multiple pool creations (data module) > - IcebergCommitter<init> (flink module) > IcebergFilesCommitter.open (flink module) > - IcebergSource.planSplitsForBatch (flink module) > - StreamingMonitorFunction.open (flink module) > - ContinuousSplitPlannerImpl<init> (flink module) > - Coordinator<init> - Kafka coordinator - I'm not sure that this > belongs to here (kafka-connect) > > The code we need to duplicate in core/data/flink/kafka module is: > > * public static ExecutorService newNonExitingWorkerPool(String > namePrefix, int poolSize) {* > * return Executors.newFixedThreadPool(* > * poolSize,* > * new > ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + > "-%d").build());* > * }* > > > Maybe adding another utility method to the `ThreadPools` would help > future contributors to think twice about the need for using the `Exiting` > solution, so I would prefer to add this method to the core `ThreadPools` > with enough javadoc to highlight the intended usage. > > Thanks, > Peter > > rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept. 18., > Sze, 23:26): > >> I think this is the intended behavior. The code calls >> `MoreExecutors.getExitingExecutorService` internally to ensure the pool >> exits. I think the right fix is for callers to create their own >> `ExecutorService` rather than using `newWorkerPool`. That allows for >> customization without making Iceberg more complicated. `ThreadPools` isn't >> doing anything special here. It's just a convenience method to create an >> exiting, fixed-size thread pool that runs daemon threads. If that's not >> what you're looking for then isn't it reasonable to make your own >> convenience method? >> >> On Wed, Sep 18, 2024 at 1:22 PM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> This is not just a Flink issue, tha calls are spread out in multiple >>> packages. We checked the code, and in many of the current use-cases in the >>> Iceberg repo the pool is not used in a static environment, and closed >>> manually. In this cases we should switch to a thread pool without a >>> shutdown hook. So I think minimally we need to create a utility method to >>> create such a pool. >>> >>> The main question is: >>> - Is it a bug, or a feature, that we always provide a pool with a hook? >>> >>> If this is a bug, then we create a "newExitingWorkerPool", and change >>> the callers to use the correct one. >>> If this is a feature, then we create a "newNotExitingWorkerPool" (which >>> is gross IMHO, but we should consider API compatibility), and change the >>> callers to use the correct one. >>> >>> Thanks, >>> Peter >>> >>> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com> wrote: >>> >>>> Since we're using standard interfaces, maybe we should just document >>>> this behavior and you can control it by creating your own worker pool >>>> instead? >>>> >>>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry <peter.vary.apa...@gmail.com> >>>> wrote: >>>> >>>>> Bumping this thread a bit. >>>>> >>>>> Cleaning up the pool in non-static cases should be a responsibility of >>>>> the user. If they want a pool which is closed by a hook when the JVM >>>>> exists >>>>> they should explicitly "say" so, for example calling >>>>> "newExitingWorkerPool". >>>>> >>>>> This is a behaviour change in the API, so I think we need feedback >>>>> from the community before proceeding with it. >>>>> What are your thoughts? >>>>> >>>>> Thanks, >>>>> Peter >>>>> >>>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13., P, >>>>> 17:16): >>>>> >>>>>> Hi all, >>>>>> >>>>>> During the investigation of a metaspace memory leak issue in Flink >>>>>> IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a >>>>>> discussion with @pvary revealed that *ThreadPools.newWorkerPool* >>>>>> currently registers a Shutdown Hook via ExitingExecutorService for all >>>>>> created thread pools. While this ensures graceful shutdown of the pools >>>>>> when the JVM exits, it might lead to unnecessary Shutdown Hook >>>>>> accumulation, especially when the pool is explicitly closed within the >>>>>> application's lifecycle. >>>>>> >>>>>> I propose to *modify ThreadPools.newWorkerPool to not register a >>>>>> Shutdown Hook by default*. This would prevent potential issues where >>>>>> developers might unintentionally register numerous Shutdown Hooks when >>>>>> using ThreadPools.newWorkerPool for short-lived thread pools. >>>>>> To retain the existing functionality for long-lived thread pools that >>>>>> require a Shutdown Hook, I suggest introducing a new, more descriptive >>>>>> function, such as *newExitingWorkerPool*. This function would >>>>>> explicitly create thread pools that are registered with a Shutdown Hook. >>>>>> >>>>>> *This change might potentially impact users who rely on the implicit >>>>>> Shutdown Hook registration provided by the current >>>>>> ThreadPools.newWorkerPool implementation.* >>>>>> I would like to gather feedback from the community regarding this >>>>>> proposed change, especially regarding potential compatibility concerns. >>>>>> >>>>>> Best regards, >>>>>> Feng Jiajie >>>>>> >>>>>>