> I don't think that solves the problem that these are used more widely than intended and without people knowing the behavior.
Ryan, to solve this problem, I suggest we deprecate the current `newWorkerPool` with `newExitingWorkerPool`. This way, when people calls `newExitingWorkerPool`, the intended behavior is clear from the method name. On Fri, Sep 27, 2024 at 1:58 PM rdb...@gmail.com <rdb...@gmail.com> wrote: > I'm okay with adding newFixedThreadPool as Steven suggests, but I don't > think that solves the problem that these are used more widely than intended > and without people knowing the behavior. Even though "non-exiting" is > awkward, it is maybe a good option to call out behavior. +1 for Javadoc, > and +1 for doing something here since there are improper uses throughout > Iceberg. Thanks for raising this, Peter! > > On Thu, Sep 26, 2024 at 1:52 AM Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > >> Hi Steven, >> >> I agree with you here. I think we can use semantics similar to >> ThreadPoolExecutor/ScheduledThreadPoolExecutor (like >> newFixedThreadPool, newWorkStealingPool, ...). >> >> Regards >> JB >> >> On Thu, Sep 26, 2024 at 2:05 AM Steven Wu <stevenz...@gmail.com> wrote: >> > >> > >> > First, we should definitely add Javadoc to `ThreadPools.newWorkerPool` >> on its behavior with a shutdown hook. It is not obvious from the method >> name. I would actually go further to deprecate `newWorkerPool` with >> `newExitingWorkerPool`. `newWorkerPool` method name is easy to cause the >> misuage, as the intention is not obvious from the name. >> > >> > `newNonExitingWorkerPool` is a little awkward to me. `NonExiting` >> should be the default behavior. Maybe we can call this new method as >> `newFixedThreadPool(int poolSize, String prefix)`. Alternatively, we can >> just make `ThreadPools.newDaemonThreadFactory` public as the proposed >> `newNonExitingWorkerPool` really just saved one line on the thread factory >> construction. >> > >> > >> > On Wed, Sep 18, 2024 at 10:25 PM Péter Váry < >> peter.vary.apa...@gmail.com> wrote: >> >> >> >> Here are the cases where we call the `newWorkerPool` in our code: >> >> >> >> Correctly: >> >> >> >> S3FileIO.executorService >> >> HadoopFileIO.executorService >> >> >> >> Incorrectly: >> >> >> >> CountersBenchmark.defaultCounterMultipleThreads (core module) >> >> BaseDistributedDataScan.newMonitorPool (core module) >> >> FlinkInputFormat.createInputSplits (flink module) >> >> IcebergInputFormat.getSplits (flink module) >> >> >> >> Incorrectly, but typically called only once in the JVM lifecycle >> >> >> >> TableMigrationUtil.migrationService - the pool management is >> abandoned, and nothing prevents multiple pool creations (data module) >> >> IcebergCommitter<init> (flink module) >> >> IcebergFilesCommitter.open (flink module) >> >> IcebergSource.planSplitsForBatch (flink module) >> >> StreamingMonitorFunction.open (flink module) >> >> ContinuousSplitPlannerImpl<init> (flink module) >> >> Coordinator<init> - Kafka coordinator - I'm not sure that this belongs >> to here (kafka-connect) >> >> >> >> The code we need to duplicate in core/data/flink/kafka module is: >> >> >> >> public static ExecutorService newNonExitingWorkerPool(String >> namePrefix, int poolSize) { >> >> return Executors.newFixedThreadPool( >> >> poolSize, >> >> new >> ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + >> "-%d").build()); >> >> } >> >> >> >> >> >> Maybe adding another utility method to the `ThreadPools` would help >> future contributors to think twice about the need for using the `Exiting` >> solution, so I would prefer to add this method to the core `ThreadPools` >> with enough javadoc to highlight the intended usage. >> >> >> >> Thanks, >> >> Peter >> >> >> >> rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept. >> 18., Sze, 23:26): >> >>> >> >>> I think this is the intended behavior. The code calls >> `MoreExecutors.getExitingExecutorService` internally to ensure the pool >> exits. I think the right fix is for callers to create their own >> `ExecutorService` rather than using `newWorkerPool`. That allows for >> customization without making Iceberg more complicated. `ThreadPools` isn't >> doing anything special here. It's just a convenience method to create an >> exiting, fixed-size thread pool that runs daemon threads. If that's not >> what you're looking for then isn't it reasonable to make your own >> convenience method? >> >>> >> >>> On Wed, Sep 18, 2024 at 1:22 PM Péter Váry < >> peter.vary.apa...@gmail.com> wrote: >> >>>> >> >>>> This is not just a Flink issue, tha calls are spread out in multiple >> packages. We checked the code, and in many of the current use-cases in the >> Iceberg repo the pool is not used in a static environment, and closed >> manually. In this cases we should switch to a thread pool without a >> shutdown hook. So I think minimally we need to create a utility method to >> create such a pool. >> >>>> >> >>>> The main question is: >> >>>> - Is it a bug, or a feature, that we always provide a pool with a >> hook? >> >>>> >> >>>> If this is a bug, then we create a "newExitingWorkerPool", and >> change the callers to use the correct one. >> >>>> If this is a feature, then we create a "newNotExitingWorkerPool" >> (which is gross IMHO, but we should consider API compatibility), and change >> the callers to use the correct one. >> >>>> >> >>>> Thanks, >> >>>> Peter >> >>>> >> >>>> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com> >> wrote: >> >>>>> >> >>>>> Since we're using standard interfaces, maybe we should just >> document this behavior and you can control it by creating your own worker >> pool instead? >> >>>>> >> >>>>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry < >> peter.vary.apa...@gmail.com> wrote: >> >>>>>> >> >>>>>> Bumping this thread a bit. >> >>>>>> >> >>>>>> Cleaning up the pool in non-static cases should be a >> responsibility of the user. If they want a pool which is closed by a hook >> when the JVM exists they should explicitly "say" so, for example calling >> "newExitingWorkerPool". >> >>>>>> >> >>>>>> This is a behaviour change in the API, so I think we need feedback >> from the community before proceeding with it. >> >>>>>> What are your thoughts? >> >>>>>> >> >>>>>> Thanks, >> >>>>>> Peter >> >>>>>> >> >>>>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13., >> P, 17:16): >> >>>>>>> >> >>>>>>> Hi all, >> >>>>>>> >> >>>>>>> During the investigation of a metaspace memory leak issue in >> Flink IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a >> discussion with @pvary revealed that ThreadPools.newWorkerPool currently >> registers a Shutdown Hook via ExitingExecutorService for all created thread >> pools. While this ensures graceful shutdown of the pools when the JVM >> exits, it might lead to unnecessary Shutdown Hook accumulation, especially >> when the pool is explicitly closed within the application's lifecycle. >> >>>>>>> >> >>>>>>> I propose to modify ThreadPools.newWorkerPool to not register a >> Shutdown Hook by default. This would prevent potential issues where >> developers might unintentionally register numerous Shutdown Hooks when >> using ThreadPools.newWorkerPool for short-lived thread pools. >> >>>>>>> To retain the existing functionality for long-lived thread pools >> that require a Shutdown Hook, I suggest introducing a new, more descriptive >> function, such as newExitingWorkerPool. This function would explicitly >> create thread pools that are registered with a Shutdown Hook. >> >>>>>>> >> >>>>>>> This change might potentially impact users who rely on the >> implicit Shutdown Hook registration provided by the current >> ThreadPools.newWorkerPool implementation. >> >>>>>>> I would like to gather feedback from the community regarding this >> proposed change, especially regarding potential compatibility concerns. >> >>>>>>> >> >>>>>>> Best regards, >> >>>>>>> Feng Jiajie >> >>>>>>> >> >