Hi Steven,

I agree with you here. I think we can use semantics similar to
ThreadPoolExecutor/ScheduledThreadPoolExecutor (like
newFixedThreadPool, newWorkStealingPool, ...).

Regards
JB

On Thu, Sep 26, 2024 at 2:05 AM Steven Wu <stevenz...@gmail.com> wrote:
>
>
> First, we should definitely add Javadoc to `ThreadPools.newWorkerPool` on its 
> behavior with a shutdown hook. It is not obvious from the method name. I 
> would actually go further to deprecate `newWorkerPool` with 
> `newExitingWorkerPool`. `newWorkerPool` method name is easy to cause the 
> misuage, as the intention is not obvious from the name.
>
> `newNonExitingWorkerPool` is a little awkward to me. `NonExiting` should be 
> the default behavior. Maybe we can call this new method as 
> `newFixedThreadPool(int poolSize, String prefix)`. Alternatively, we can just 
> make `ThreadPools.newDaemonThreadFactory` public as the proposed 
> `newNonExitingWorkerPool` really just saved one line on the thread factory 
> construction.
>
>
> On Wed, Sep 18, 2024 at 10:25 PM Péter Váry <peter.vary.apa...@gmail.com> 
> wrote:
>>
>> Here are the cases where we call the `newWorkerPool` in our code:
>>
>> Correctly:
>>
>> S3FileIO.executorService
>> HadoopFileIO.executorService
>>
>> Incorrectly:
>>
>> CountersBenchmark.defaultCounterMultipleThreads (core module)
>> BaseDistributedDataScan.newMonitorPool (core module)
>> FlinkInputFormat.createInputSplits (flink module)
>> IcebergInputFormat.getSplits (flink module)
>>
>> Incorrectly, but typically called only once in the JVM lifecycle
>>
>> TableMigrationUtil.migrationService - the pool management is abandoned, and 
>> nothing prevents multiple pool creations (data module)
>> IcebergCommitter<init> (flink module)
>> IcebergFilesCommitter.open (flink module)
>> IcebergSource.planSplitsForBatch (flink module)
>> StreamingMonitorFunction.open (flink module)
>> ContinuousSplitPlannerImpl<init> (flink module)
>> Coordinator<init> - Kafka coordinator - I'm not sure that this belongs to 
>> here (kafka-connect)
>>
>> The code we need to duplicate in core/data/flink/kafka module is:
>>
>>   public static ExecutorService newNonExitingWorkerPool(String namePrefix, 
>> int poolSize) {
>>     return Executors.newFixedThreadPool(
>>         poolSize,
>>         new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix 
>> + "-%d").build());
>>   }
>>
>>
>> Maybe adding another utility method to the `ThreadPools` would help future 
>> contributors to think twice about the need for using the `Exiting` solution, 
>> so I would prefer to add this method to the core `ThreadPools` with enough 
>> javadoc to highlight the intended usage.
>>
>> Thanks,
>> Peter
>>
>> rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept. 18., 
>> Sze, 23:26):
>>>
>>> I think this is the intended behavior. The code calls 
>>> `MoreExecutors.getExitingExecutorService` internally to ensure the pool 
>>> exits. I think the right fix is for callers to create their own 
>>> `ExecutorService` rather than using `newWorkerPool`. That allows for 
>>> customization without making Iceberg more complicated. `ThreadPools` isn't 
>>> doing anything special here. It's just a convenience method to create an 
>>> exiting, fixed-size thread pool that runs daemon threads. If that's not 
>>> what you're looking for then isn't it reasonable to make your own 
>>> convenience method?
>>>
>>> On Wed, Sep 18, 2024 at 1:22 PM Péter Váry <peter.vary.apa...@gmail.com> 
>>> wrote:
>>>>
>>>> This is not just a Flink issue, tha calls are spread out in multiple 
>>>> packages. We checked the code, and in many of the current use-cases in the 
>>>> Iceberg repo the pool is not used in a static environment, and closed 
>>>> manually. In this cases we should switch to a thread pool without a 
>>>> shutdown hook. So I think minimally we need to create a utility method to 
>>>> create such a pool.
>>>>
>>>> The main question is:
>>>> - Is it a bug, or a feature, that we always provide a pool with a hook?
>>>>
>>>> If this is a bug, then we create a "newExitingWorkerPool", and change the 
>>>> callers to use the correct one.
>>>> If this is a feature, then we create a "newNotExitingWorkerPool" (which is 
>>>> gross IMHO, but we should consider API compatibility), and change the 
>>>> callers to use the correct one.
>>>>
>>>> Thanks,
>>>> Peter
>>>>
>>>> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com> wrote:
>>>>>
>>>>> Since we're using standard interfaces, maybe we should just document this 
>>>>> behavior and you can control it by creating your own worker pool instead?
>>>>>
>>>>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry <peter.vary.apa...@gmail.com> 
>>>>> wrote:
>>>>>>
>>>>>> Bumping this thread a bit.
>>>>>>
>>>>>> Cleaning up the pool in non-static cases should be a responsibility of 
>>>>>> the user. If they want a pool which is closed by a hook when the JVM 
>>>>>> exists they should explicitly "say" so, for example calling 
>>>>>> "newExitingWorkerPool".
>>>>>>
>>>>>> This is a behaviour change in the API, so I think we need feedback from 
>>>>>> the community before proceeding with it.
>>>>>> What are your thoughts?
>>>>>>
>>>>>> Thanks,
>>>>>> Peter
>>>>>>
>>>>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13., P, 
>>>>>> 17:16):
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> During the investigation of a metaspace memory leak issue in Flink 
>>>>>>> IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a 
>>>>>>> discussion with @pvary revealed that ThreadPools.newWorkerPool 
>>>>>>> currently registers a Shutdown Hook via ExitingExecutorService for all 
>>>>>>> created thread pools. While this ensures graceful shutdown of the pools 
>>>>>>> when the JVM exits, it might lead to unnecessary Shutdown Hook 
>>>>>>> accumulation, especially when the pool is explicitly closed within the 
>>>>>>> application's lifecycle.
>>>>>>>
>>>>>>> I propose to modify ThreadPools.newWorkerPool to not register a 
>>>>>>> Shutdown Hook by default. This would prevent potential issues where 
>>>>>>> developers might unintentionally register numerous Shutdown Hooks when 
>>>>>>> using ThreadPools.newWorkerPool for short-lived thread pools.
>>>>>>> To retain the existing functionality for long-lived thread pools that 
>>>>>>> require a Shutdown Hook, I suggest introducing a new, more descriptive 
>>>>>>> function, such as newExitingWorkerPool. This function would explicitly 
>>>>>>> create thread pools that are registered with a Shutdown Hook.
>>>>>>>
>>>>>>> This change might potentially impact users who rely on the implicit 
>>>>>>> Shutdown Hook registration provided by the current 
>>>>>>> ThreadPools.newWorkerPool implementation.
>>>>>>> I would like to gather feedback from the community regarding this 
>>>>>>> proposed change, especially regarding potential compatibility concerns.
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Feng Jiajie
>>>>>>>

Reply via email to