First, we should definitely add Javadoc to `ThreadPools.newWorkerPool` on
its behavior with a shutdown hook. It is not obvious from the method name.
I would actually go further to deprecate `newWorkerPool` with
`newExitingWorkerPool`. `newWorkerPool` method name is easy to cause the
misuage, as the intention is not obvious from the name.

`*newNonExitingWorkerPool*` is a little awkward to me. `NonExiting` should
be the default behavior. Maybe we can call this new method as
`newFixedThreadPool(int poolSize, String prefix)`. Alternatively, we can
just make `ThreadPools.newDaemonThreadFactory` public as the proposed `
*newNonExitingWorkerPool` *really just saved one line on the thread factory
construction.


On Wed, Sep 18, 2024 at 10:25 PM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Here are the cases where we call the `newWorkerPool` in our code:
>
>    - Correctly:
>       - S3FileIO.executorService
>       - HadoopFileIO.executorService
>    - Incorrectly:
>       - CountersBenchmark.defaultCounterMultipleThreads (core module)
>       - BaseDistributedDataScan.newMonitorPool (core module)
>       - FlinkInputFormat.createInputSplits (flink module)
>       - IcebergInputFormat.getSplits (flink module)
>    - Incorrectly, but typically called only once in the JVM lifecycle
>       - TableMigrationUtil.migrationService - the pool management is
>       abandoned, and nothing prevents multiple pool creations (data module)
>       - IcebergCommitter<init> (flink module)
>       IcebergFilesCommitter.open (flink module)
>       - IcebergSource.planSplitsForBatch (flink module)
>       - StreamingMonitorFunction.open (flink module)
>       - ContinuousSplitPlannerImpl<init> (flink module)
>       - Coordinator<init> - Kafka coordinator - I'm not sure that this
>       belongs to here (kafka-connect)
>
> The code we need to duplicate in core/data/flink/kafka module is:
>
> *  public static ExecutorService newNonExitingWorkerPool(String
> namePrefix, int poolSize) {*
> *    return Executors.newFixedThreadPool(*
> *        poolSize,*
> *        new
> ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix +
> "-%d").build());*
> *  }*
>
>
> Maybe adding another utility method to the `ThreadPools` would help
> future contributors to think twice about the need for using the `Exiting`
> solution, so I would prefer to add this method to the core `ThreadPools`
> with enough javadoc to highlight the intended usage.
>
> Thanks,
> Peter
>
> rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept. 18.,
> Sze, 23:26):
>
>> I think this is the intended behavior. The code calls
>> `MoreExecutors.getExitingExecutorService` internally to ensure the pool
>> exits. I think the right fix is for callers to create their own
>> `ExecutorService` rather than using `newWorkerPool`. That allows for
>> customization without making Iceberg more complicated. `ThreadPools` isn't
>> doing anything special here. It's just a convenience method to create an
>> exiting, fixed-size thread pool that runs daemon threads. If that's not
>> what you're looking for then isn't it reasonable to make your own
>> convenience method?
>>
>> On Wed, Sep 18, 2024 at 1:22 PM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> This is not just a Flink issue, tha calls are spread out in multiple
>>> packages. We checked the code, and in many of the current use-cases in the
>>> Iceberg repo the pool is not used in a static environment, and closed
>>> manually. In this cases we should switch to a thread pool without a
>>> shutdown hook. So I think minimally we need to create a utility method to
>>> create such a pool.
>>>
>>> The main question is:
>>> - Is it a bug, or a feature, that we always provide a pool with a hook?
>>>
>>> If this is a bug, then we create a "newExitingWorkerPool", and change
>>> the callers to use the correct one.
>>> If this is a feature, then we create a "newNotExitingWorkerPool" (which
>>> is gross IMHO, but we should consider API compatibility), and change the
>>> callers to use the correct one.
>>>
>>> Thanks,
>>> Peter
>>>
>>> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com> wrote:
>>>
>>>> Since we're using standard interfaces, maybe we should just document
>>>> this behavior and you can control it by creating your own worker pool
>>>> instead?
>>>>
>>>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry <peter.vary.apa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Bumping this thread a bit.
>>>>>
>>>>> Cleaning up the pool in non-static cases should be a responsibility of
>>>>> the user. If they want a pool which is closed by a hook when the JVM 
>>>>> exists
>>>>> they should explicitly "say" so, for example calling 
>>>>> "newExitingWorkerPool".
>>>>>
>>>>> This is a behaviour change in the API, so I think we need feedback
>>>>> from the community before proceeding with it.
>>>>> What are your thoughts?
>>>>>
>>>>> Thanks,
>>>>> Peter
>>>>>
>>>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13., P,
>>>>> 17:16):
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> During the investigation of a metaspace memory leak issue in Flink
>>>>>> IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a
>>>>>> discussion with @pvary revealed that *ThreadPools.newWorkerPool*
>>>>>> currently registers a Shutdown Hook via ExitingExecutorService for all
>>>>>> created thread pools. While this ensures graceful shutdown of the pools
>>>>>> when the JVM exits, it might lead to unnecessary Shutdown Hook
>>>>>> accumulation, especially when the pool is explicitly closed within the
>>>>>> application's lifecycle.
>>>>>>
>>>>>> I propose to *modify ThreadPools.newWorkerPool to not register a
>>>>>> Shutdown Hook by default*. This would prevent potential issues where
>>>>>> developers might unintentionally register numerous Shutdown Hooks when
>>>>>> using ThreadPools.newWorkerPool for short-lived thread pools.
>>>>>> To retain the existing functionality for long-lived thread pools that
>>>>>> require a Shutdown Hook, I suggest introducing a new, more descriptive
>>>>>> function, such as *newExitingWorkerPool*. This function would
>>>>>> explicitly create thread pools that are registered with a Shutdown Hook.
>>>>>>
>>>>>> *This change might potentially impact users who rely on the implicit
>>>>>> Shutdown Hook registration provided by the current
>>>>>> ThreadPools.newWorkerPool implementation.*
>>>>>> I would like to gather feedback from the community regarding this
>>>>>> proposed change, especially regarding potential compatibility concerns.
>>>>>>
>>>>>> Best regards,
>>>>>> Feng Jiajie
>>>>>>
>>>>>>

Reply via email to