I'm okay with adding newFixedThreadPool as Steven suggests, but I don't
think that solves the problem that these are used more widely than intended
and without people knowing the behavior. Even though "non-exiting" is
awkward, it is maybe a good option to call out behavior. +1 for Javadoc,
and +1 for doing something here since there are improper uses throughout
Iceberg. Thanks for raising this, Peter!

On Thu, Sep 26, 2024 at 1:52 AM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Steven,
>
> I agree with you here. I think we can use semantics similar to
> ThreadPoolExecutor/ScheduledThreadPoolExecutor (like
> newFixedThreadPool, newWorkStealingPool, ...).
>
> Regards
> JB
>
> On Thu, Sep 26, 2024 at 2:05 AM Steven Wu <stevenz...@gmail.com> wrote:
> >
> >
> > First, we should definitely add Javadoc to `ThreadPools.newWorkerPool`
> on its behavior with a shutdown hook. It is not obvious from the method
> name. I would actually go further to deprecate `newWorkerPool` with
> `newExitingWorkerPool`. `newWorkerPool` method name is easy to cause the
> misuage, as the intention is not obvious from the name.
> >
> > `newNonExitingWorkerPool` is a little awkward to me. `NonExiting` should
> be the default behavior. Maybe we can call this new method as
> `newFixedThreadPool(int poolSize, String prefix)`. Alternatively, we can
> just make `ThreadPools.newDaemonThreadFactory` public as the proposed
> `newNonExitingWorkerPool` really just saved one line on the thread factory
> construction.
> >
> >
> > On Wed, Sep 18, 2024 at 10:25 PM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
> >>
> >> Here are the cases where we call the `newWorkerPool` in our code:
> >>
> >> Correctly:
> >>
> >> S3FileIO.executorService
> >> HadoopFileIO.executorService
> >>
> >> Incorrectly:
> >>
> >> CountersBenchmark.defaultCounterMultipleThreads (core module)
> >> BaseDistributedDataScan.newMonitorPool (core module)
> >> FlinkInputFormat.createInputSplits (flink module)
> >> IcebergInputFormat.getSplits (flink module)
> >>
> >> Incorrectly, but typically called only once in the JVM lifecycle
> >>
> >> TableMigrationUtil.migrationService - the pool management is abandoned,
> and nothing prevents multiple pool creations (data module)
> >> IcebergCommitter<init> (flink module)
> >> IcebergFilesCommitter.open (flink module)
> >> IcebergSource.planSplitsForBatch (flink module)
> >> StreamingMonitorFunction.open (flink module)
> >> ContinuousSplitPlannerImpl<init> (flink module)
> >> Coordinator<init> - Kafka coordinator - I'm not sure that this belongs
> to here (kafka-connect)
> >>
> >> The code we need to duplicate in core/data/flink/kafka module is:
> >>
> >>   public static ExecutorService newNonExitingWorkerPool(String
> namePrefix, int poolSize) {
> >>     return Executors.newFixedThreadPool(
> >>         poolSize,
> >>         new
> ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix +
> "-%d").build());
> >>   }
> >>
> >>
> >> Maybe adding another utility method to the `ThreadPools` would help
> future contributors to think twice about the need for using the `Exiting`
> solution, so I would prefer to add this method to the core `ThreadPools`
> with enough javadoc to highlight the intended usage.
> >>
> >> Thanks,
> >> Peter
> >>
> >> rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept.
> 18., Sze, 23:26):
> >>>
> >>> I think this is the intended behavior. The code calls
> `MoreExecutors.getExitingExecutorService` internally to ensure the pool
> exits. I think the right fix is for callers to create their own
> `ExecutorService` rather than using `newWorkerPool`. That allows for
> customization without making Iceberg more complicated. `ThreadPools` isn't
> doing anything special here. It's just a convenience method to create an
> exiting, fixed-size thread pool that runs daemon threads. If that's not
> what you're looking for then isn't it reasonable to make your own
> convenience method?
> >>>
> >>> On Wed, Sep 18, 2024 at 1:22 PM Péter Váry <
> peter.vary.apa...@gmail.com> wrote:
> >>>>
> >>>> This is not just a Flink issue, tha calls are spread out in multiple
> packages. We checked the code, and in many of the current use-cases in the
> Iceberg repo the pool is not used in a static environment, and closed
> manually. In this cases we should switch to a thread pool without a
> shutdown hook. So I think minimally we need to create a utility method to
> create such a pool.
> >>>>
> >>>> The main question is:
> >>>> - Is it a bug, or a feature, that we always provide a pool with a
> hook?
> >>>>
> >>>> If this is a bug, then we create a "newExitingWorkerPool", and change
> the callers to use the correct one.
> >>>> If this is a feature, then we create a "newNotExitingWorkerPool"
> (which is gross IMHO, but we should consider API compatibility), and change
> the callers to use the correct one.
> >>>>
> >>>> Thanks,
> >>>> Peter
> >>>>
> >>>> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com>
> wrote:
> >>>>>
> >>>>> Since we're using standard interfaces, maybe we should just document
> this behavior and you can control it by creating your own worker pool
> instead?
> >>>>>
> >>>>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry <
> peter.vary.apa...@gmail.com> wrote:
> >>>>>>
> >>>>>> Bumping this thread a bit.
> >>>>>>
> >>>>>> Cleaning up the pool in non-static cases should be a responsibility
> of the user. If they want a pool which is closed by a hook when the JVM
> exists they should explicitly "say" so, for example calling
> "newExitingWorkerPool".
> >>>>>>
> >>>>>> This is a behaviour change in the API, so I think we need feedback
> from the community before proceeding with it.
> >>>>>> What are your thoughts?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Peter
> >>>>>>
> >>>>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13.,
> P, 17:16):
> >>>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> During the investigation of a metaspace memory leak issue in Flink
> IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a
> discussion with @pvary revealed that ThreadPools.newWorkerPool currently
> registers a Shutdown Hook via ExitingExecutorService for all created thread
> pools. While this ensures graceful shutdown of the pools when the JVM
> exits, it might lead to unnecessary Shutdown Hook accumulation, especially
> when the pool is explicitly closed within the application's lifecycle.
> >>>>>>>
> >>>>>>> I propose to modify ThreadPools.newWorkerPool to not register a
> Shutdown Hook by default. This would prevent potential issues where
> developers might unintentionally register numerous Shutdown Hooks when
> using ThreadPools.newWorkerPool for short-lived thread pools.
> >>>>>>> To retain the existing functionality for long-lived thread pools
> that require a Shutdown Hook, I suggest introducing a new, more descriptive
> function, such as newExitingWorkerPool. This function would explicitly
> create thread pools that are registered with a Shutdown Hook.
> >>>>>>>
> >>>>>>> This change might potentially impact users who rely on the
> implicit Shutdown Hook registration provided by the current
> ThreadPools.newWorkerPool implementation.
> >>>>>>> I would like to gather feedback from the community regarding this
> proposed change, especially regarding potential compatibility concerns.
> >>>>>>>
> >>>>>>> Best regards,
> >>>>>>> Feng Jiajie
> >>>>>>>
>

Reply via email to