I'm okay with adding newFixedThreadPool as Steven suggests, but I don't think that solves the problem that these are used more widely than intended and without people knowing the behavior. Even though "non-exiting" is awkward, it is maybe a good option to call out behavior. +1 for Javadoc, and +1 for doing something here since there are improper uses throughout Iceberg. Thanks for raising this, Peter!
On Thu, Sep 26, 2024 at 1:52 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Steven, > > I agree with you here. I think we can use semantics similar to > ThreadPoolExecutor/ScheduledThreadPoolExecutor (like > newFixedThreadPool, newWorkStealingPool, ...). > > Regards > JB > > On Thu, Sep 26, 2024 at 2:05 AM Steven Wu <stevenz...@gmail.com> wrote: > > > > > > First, we should definitely add Javadoc to `ThreadPools.newWorkerPool` > on its behavior with a shutdown hook. It is not obvious from the method > name. I would actually go further to deprecate `newWorkerPool` with > `newExitingWorkerPool`. `newWorkerPool` method name is easy to cause the > misuage, as the intention is not obvious from the name. > > > > `newNonExitingWorkerPool` is a little awkward to me. `NonExiting` should > be the default behavior. Maybe we can call this new method as > `newFixedThreadPool(int poolSize, String prefix)`. Alternatively, we can > just make `ThreadPools.newDaemonThreadFactory` public as the proposed > `newNonExitingWorkerPool` really just saved one line on the thread factory > construction. > > > > > > On Wed, Sep 18, 2024 at 10:25 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> > >> Here are the cases where we call the `newWorkerPool` in our code: > >> > >> Correctly: > >> > >> S3FileIO.executorService > >> HadoopFileIO.executorService > >> > >> Incorrectly: > >> > >> CountersBenchmark.defaultCounterMultipleThreads (core module) > >> BaseDistributedDataScan.newMonitorPool (core module) > >> FlinkInputFormat.createInputSplits (flink module) > >> IcebergInputFormat.getSplits (flink module) > >> > >> Incorrectly, but typically called only once in the JVM lifecycle > >> > >> TableMigrationUtil.migrationService - the pool management is abandoned, > and nothing prevents multiple pool creations (data module) > >> IcebergCommitter<init> (flink module) > >> IcebergFilesCommitter.open (flink module) > >> IcebergSource.planSplitsForBatch (flink module) > >> StreamingMonitorFunction.open (flink module) > >> ContinuousSplitPlannerImpl<init> (flink module) > >> Coordinator<init> - Kafka coordinator - I'm not sure that this belongs > to here (kafka-connect) > >> > >> The code we need to duplicate in core/data/flink/kafka module is: > >> > >> public static ExecutorService newNonExitingWorkerPool(String > namePrefix, int poolSize) { > >> return Executors.newFixedThreadPool( > >> poolSize, > >> new > ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + > "-%d").build()); > >> } > >> > >> > >> Maybe adding another utility method to the `ThreadPools` would help > future contributors to think twice about the need for using the `Exiting` > solution, so I would prefer to add this method to the core `ThreadPools` > with enough javadoc to highlight the intended usage. > >> > >> Thanks, > >> Peter > >> > >> rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept. > 18., Sze, 23:26): > >>> > >>> I think this is the intended behavior. The code calls > `MoreExecutors.getExitingExecutorService` internally to ensure the pool > exits. I think the right fix is for callers to create their own > `ExecutorService` rather than using `newWorkerPool`. That allows for > customization without making Iceberg more complicated. `ThreadPools` isn't > doing anything special here. It's just a convenience method to create an > exiting, fixed-size thread pool that runs daemon threads. If that's not > what you're looking for then isn't it reasonable to make your own > convenience method? > >>> > >>> On Wed, Sep 18, 2024 at 1:22 PM Péter Váry < > peter.vary.apa...@gmail.com> wrote: > >>>> > >>>> This is not just a Flink issue, tha calls are spread out in multiple > packages. We checked the code, and in many of the current use-cases in the > Iceberg repo the pool is not used in a static environment, and closed > manually. In this cases we should switch to a thread pool without a > shutdown hook. So I think minimally we need to create a utility method to > create such a pool. > >>>> > >>>> The main question is: > >>>> - Is it a bug, or a feature, that we always provide a pool with a > hook? > >>>> > >>>> If this is a bug, then we create a "newExitingWorkerPool", and change > the callers to use the correct one. > >>>> If this is a feature, then we create a "newNotExitingWorkerPool" > (which is gross IMHO, but we should consider API compatibility), and change > the callers to use the correct one. > >>>> > >>>> Thanks, > >>>> Peter > >>>> > >>>> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com> > wrote: > >>>>> > >>>>> Since we're using standard interfaces, maybe we should just document > this behavior and you can control it by creating your own worker pool > instead? > >>>>> > >>>>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry < > peter.vary.apa...@gmail.com> wrote: > >>>>>> > >>>>>> Bumping this thread a bit. > >>>>>> > >>>>>> Cleaning up the pool in non-static cases should be a responsibility > of the user. If they want a pool which is closed by a hook when the JVM > exists they should explicitly "say" so, for example calling > "newExitingWorkerPool". > >>>>>> > >>>>>> This is a behaviour change in the API, so I think we need feedback > from the community before proceeding with it. > >>>>>> What are your thoughts? > >>>>>> > >>>>>> Thanks, > >>>>>> Peter > >>>>>> > >>>>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13., > P, 17:16): > >>>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> During the investigation of a metaspace memory leak issue in Flink > IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a > discussion with @pvary revealed that ThreadPools.newWorkerPool currently > registers a Shutdown Hook via ExitingExecutorService for all created thread > pools. While this ensures graceful shutdown of the pools when the JVM > exits, it might lead to unnecessary Shutdown Hook accumulation, especially > when the pool is explicitly closed within the application's lifecycle. > >>>>>>> > >>>>>>> I propose to modify ThreadPools.newWorkerPool to not register a > Shutdown Hook by default. This would prevent potential issues where > developers might unintentionally register numerous Shutdown Hooks when > using ThreadPools.newWorkerPool for short-lived thread pools. > >>>>>>> To retain the existing functionality for long-lived thread pools > that require a Shutdown Hook, I suggest introducing a new, more descriptive > function, such as newExitingWorkerPool. This function would explicitly > create thread pools that are registered with a Shutdown Hook. > >>>>>>> > >>>>>>> This change might potentially impact users who rely on the > implicit Shutdown Hook registration provided by the current > ThreadPools.newWorkerPool implementation. > >>>>>>> I would like to gather feedback from the community regarding this > proposed change, especially regarding potential compatibility concerns. > >>>>>>> > >>>>>>> Best regards, > >>>>>>> Feng Jiajie > >>>>>>> >