>  I don't think that solves the problem that these are used more widely
than intended and without people knowing the behavior.

Ryan, to solve this problem, I suggest we deprecate the current
`newWorkerPool` with `newExitingWorkerPool`. This way, when people calls
`newExitingWorkerPool`, the intended behavior is clear from the method name.

On Fri, Sep 27, 2024 at 1:58 PM rdb...@gmail.com <rdb...@gmail.com> wrote:

> I'm okay with adding newFixedThreadPool as Steven suggests, but I don't
> think that solves the problem that these are used more widely than intended
> and without people knowing the behavior. Even though "non-exiting" is
> awkward, it is maybe a good option to call out behavior. +1 for Javadoc,
> and +1 for doing something here since there are improper uses throughout
> Iceberg. Thanks for raising this, Peter!
>
> On Thu, Sep 26, 2024 at 1:52 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
>> Hi Steven,
>>
>> I agree with you here. I think we can use semantics similar to
>> ThreadPoolExecutor/ScheduledThreadPoolExecutor (like
>> newFixedThreadPool, newWorkStealingPool, ...).
>>
>> Regards
>> JB
>>
>> On Thu, Sep 26, 2024 at 2:05 AM Steven Wu <stevenz...@gmail.com> wrote:
>> >
>> >
>> > First, we should definitely add Javadoc to `ThreadPools.newWorkerPool`
>> on its behavior with a shutdown hook. It is not obvious from the method
>> name. I would actually go further to deprecate `newWorkerPool` with
>> `newExitingWorkerPool`. `newWorkerPool` method name is easy to cause the
>> misuage, as the intention is not obvious from the name.
>> >
>> > `newNonExitingWorkerPool` is a little awkward to me. `NonExiting`
>> should be the default behavior. Maybe we can call this new method as
>> `newFixedThreadPool(int poolSize, String prefix)`. Alternatively, we can
>> just make `ThreadPools.newDaemonThreadFactory` public as the proposed
>> `newNonExitingWorkerPool` really just saved one line on the thread factory
>> construction.
>> >
>> >
>> > On Wed, Sep 18, 2024 at 10:25 PM Péter Váry <
>> peter.vary.apa...@gmail.com> wrote:
>> >>
>> >> Here are the cases where we call the `newWorkerPool` in our code:
>> >>
>> >> Correctly:
>> >>
>> >> S3FileIO.executorService
>> >> HadoopFileIO.executorService
>> >>
>> >> Incorrectly:
>> >>
>> >> CountersBenchmark.defaultCounterMultipleThreads (core module)
>> >> BaseDistributedDataScan.newMonitorPool (core module)
>> >> FlinkInputFormat.createInputSplits (flink module)
>> >> IcebergInputFormat.getSplits (flink module)
>> >>
>> >> Incorrectly, but typically called only once in the JVM lifecycle
>> >>
>> >> TableMigrationUtil.migrationService - the pool management is
>> abandoned, and nothing prevents multiple pool creations (data module)
>> >> IcebergCommitter<init> (flink module)
>> >> IcebergFilesCommitter.open (flink module)
>> >> IcebergSource.planSplitsForBatch (flink module)
>> >> StreamingMonitorFunction.open (flink module)
>> >> ContinuousSplitPlannerImpl<init> (flink module)
>> >> Coordinator<init> - Kafka coordinator - I'm not sure that this belongs
>> to here (kafka-connect)
>> >>
>> >> The code we need to duplicate in core/data/flink/kafka module is:
>> >>
>> >>   public static ExecutorService newNonExitingWorkerPool(String
>> namePrefix, int poolSize) {
>> >>     return Executors.newFixedThreadPool(
>> >>         poolSize,
>> >>         new
>> ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix +
>> "-%d").build());
>> >>   }
>> >>
>> >>
>> >> Maybe adding another utility method to the `ThreadPools` would help
>> future contributors to think twice about the need for using the `Exiting`
>> solution, so I would prefer to add this method to the core `ThreadPools`
>> with enough javadoc to highlight the intended usage.
>> >>
>> >> Thanks,
>> >> Peter
>> >>
>> >> rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept.
>> 18., Sze, 23:26):
>> >>>
>> >>> I think this is the intended behavior. The code calls
>> `MoreExecutors.getExitingExecutorService` internally to ensure the pool
>> exits. I think the right fix is for callers to create their own
>> `ExecutorService` rather than using `newWorkerPool`. That allows for
>> customization without making Iceberg more complicated. `ThreadPools` isn't
>> doing anything special here. It's just a convenience method to create an
>> exiting, fixed-size thread pool that runs daemon threads. If that's not
>> what you're looking for then isn't it reasonable to make your own
>> convenience method?
>> >>>
>> >>> On Wed, Sep 18, 2024 at 1:22 PM Péter Váry <
>> peter.vary.apa...@gmail.com> wrote:
>> >>>>
>> >>>> This is not just a Flink issue, tha calls are spread out in multiple
>> packages. We checked the code, and in many of the current use-cases in the
>> Iceberg repo the pool is not used in a static environment, and closed
>> manually. In this cases we should switch to a thread pool without a
>> shutdown hook. So I think minimally we need to create a utility method to
>> create such a pool.
>> >>>>
>> >>>> The main question is:
>> >>>> - Is it a bug, or a feature, that we always provide a pool with a
>> hook?
>> >>>>
>> >>>> If this is a bug, then we create a "newExitingWorkerPool", and
>> change the callers to use the correct one.
>> >>>> If this is a feature, then we create a "newNotExitingWorkerPool"
>> (which is gross IMHO, but we should consider API compatibility), and change
>> the callers to use the correct one.
>> >>>>
>> >>>> Thanks,
>> >>>> Peter
>> >>>>
>> >>>> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>> Since we're using standard interfaces, maybe we should just
>> document this behavior and you can control it by creating your own worker
>> pool instead?
>> >>>>>
>> >>>>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry <
>> peter.vary.apa...@gmail.com> wrote:
>> >>>>>>
>> >>>>>> Bumping this thread a bit.
>> >>>>>>
>> >>>>>> Cleaning up the pool in non-static cases should be a
>> responsibility of the user. If they want a pool which is closed by a hook
>> when the JVM exists they should explicitly "say" so, for example calling
>> "newExitingWorkerPool".
>> >>>>>>
>> >>>>>> This is a behaviour change in the API, so I think we need feedback
>> from the community before proceeding with it.
>> >>>>>> What are your thoughts?
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Peter
>> >>>>>>
>> >>>>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13.,
>> P, 17:16):
>> >>>>>>>
>> >>>>>>> Hi all,
>> >>>>>>>
>> >>>>>>> During the investigation of a metaspace memory leak issue in
>> Flink IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a
>> discussion with @pvary revealed that ThreadPools.newWorkerPool currently
>> registers a Shutdown Hook via ExitingExecutorService for all created thread
>> pools. While this ensures graceful shutdown of the pools when the JVM
>> exits, it might lead to unnecessary Shutdown Hook accumulation, especially
>> when the pool is explicitly closed within the application's lifecycle.
>> >>>>>>>
>> >>>>>>> I propose to modify ThreadPools.newWorkerPool to not register a
>> Shutdown Hook by default. This would prevent potential issues where
>> developers might unintentionally register numerous Shutdown Hooks when
>> using ThreadPools.newWorkerPool for short-lived thread pools.
>> >>>>>>> To retain the existing functionality for long-lived thread pools
>> that require a Shutdown Hook, I suggest introducing a new, more descriptive
>> function, such as newExitingWorkerPool. This function would explicitly
>> create thread pools that are registered with a Shutdown Hook.
>> >>>>>>>
>> >>>>>>> This change might potentially impact users who rely on the
>> implicit Shutdown Hook registration provided by the current
>> ThreadPools.newWorkerPool implementation.
>> >>>>>>> I would like to gather feedback from the community regarding this
>> proposed change, especially regarding potential compatibility concerns.
>> >>>>>>>
>> >>>>>>> Best regards,
>> >>>>>>> Feng Jiajie
>> >>>>>>>
>>
>

Reply via email to