Here are the cases where we call the `newWorkerPool` in our code:

   - Correctly:
      - S3FileIO.executorService
      - HadoopFileIO.executorService
   - Incorrectly:
      - CountersBenchmark.defaultCounterMultipleThreads (core module)
      - BaseDistributedDataScan.newMonitorPool (core module)
      - FlinkInputFormat.createInputSplits (flink module)
      - IcebergInputFormat.getSplits (flink module)
   - Incorrectly, but typically called only once in the JVM lifecycle
      - TableMigrationUtil.migrationService - the pool management is
      abandoned, and nothing prevents multiple pool creations (data module)
      - IcebergCommitter<init> (flink module)
      IcebergFilesCommitter.open (flink module)
      - IcebergSource.planSplitsForBatch (flink module)
      - StreamingMonitorFunction.open (flink module)
      - ContinuousSplitPlannerImpl<init> (flink module)
      - Coordinator<init> - Kafka coordinator - I'm not sure that this
      belongs to here (kafka-connect)

The code we need to duplicate in core/data/flink/kafka module is:

*  public static ExecutorService newNonExitingWorkerPool(String namePrefix,
int poolSize) {*
*    return Executors.newFixedThreadPool(*
*        poolSize,*
*        new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix +
"-%d").build());*
*  }*


Maybe adding another utility method to the `ThreadPools` would help
future contributors to think twice about the need for using the `Exiting`
solution, so I would prefer to add this method to the core `ThreadPools`
with enough javadoc to highlight the intended usage.

Thanks,
Peter

rdb...@gmail.com <rdb...@gmail.com> ezt írta (időpont: 2024. szept. 18.,
Sze, 23:26):

> I think this is the intended behavior. The code calls
> `MoreExecutors.getExitingExecutorService` internally to ensure the pool
> exits. I think the right fix is for callers to create their own
> `ExecutorService` rather than using `newWorkerPool`. That allows for
> customization without making Iceberg more complicated. `ThreadPools` isn't
> doing anything special here. It's just a convenience method to create an
> exiting, fixed-size thread pool that runs daemon threads. If that's not
> what you're looking for then isn't it reasonable to make your own
> convenience method?
>
> On Wed, Sep 18, 2024 at 1:22 PM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> This is not just a Flink issue, tha calls are spread out in multiple
>> packages. We checked the code, and in many of the current use-cases in the
>> Iceberg repo the pool is not used in a static environment, and closed
>> manually. In this cases we should switch to a thread pool without a
>> shutdown hook. So I think minimally we need to create a utility method to
>> create such a pool.
>>
>> The main question is:
>> - Is it a bug, or a feature, that we always provide a pool with a hook?
>>
>> If this is a bug, then we create a "newExitingWorkerPool", and change the
>> callers to use the correct one.
>> If this is a feature, then we create a "newNotExitingWorkerPool" (which
>> is gross IMHO, but we should consider API compatibility), and change the
>> callers to use the correct one.
>>
>> Thanks,
>> Peter
>>
>> On Wed, Sep 18, 2024, 21:53 rdb...@gmail.com <rdb...@gmail.com> wrote:
>>
>>> Since we're using standard interfaces, maybe we should just document
>>> this behavior and you can control it by creating your own worker pool
>>> instead?
>>>
>>> On Tue, Sep 17, 2024 at 2:20 AM Péter Váry <peter.vary.apa...@gmail.com>
>>> wrote:
>>>
>>>> Bumping this thread a bit.
>>>>
>>>> Cleaning up the pool in non-static cases should be a responsibility of
>>>> the user. If they want a pool which is closed by a hook when the JVM exists
>>>> they should explicitly "say" so, for example calling 
>>>> "newExitingWorkerPool".
>>>>
>>>> This is a behaviour change in the API, so I think we need feedback from
>>>> the community before proceeding with it.
>>>> What are your thoughts?
>>>>
>>>> Thanks,
>>>> Peter
>>>>
>>>> 冯佳捷 <laputafa...@gmail.com> ezt írta (időpont: 2024. szept. 13., P,
>>>> 17:16):
>>>>
>>>>> Hi all,
>>>>>
>>>>> During the investigation of a metaspace memory leak issue in Flink
>>>>> IcebergSource ( https://github.com/apache/iceberg/pull/11073 ), a
>>>>> discussion with @pvary revealed that *ThreadPools.newWorkerPool*
>>>>> currently registers a Shutdown Hook via ExitingExecutorService for all
>>>>> created thread pools. While this ensures graceful shutdown of the pools
>>>>> when the JVM exits, it might lead to unnecessary Shutdown Hook
>>>>> accumulation, especially when the pool is explicitly closed within the
>>>>> application's lifecycle.
>>>>>
>>>>> I propose to *modify ThreadPools.newWorkerPool to not register a
>>>>> Shutdown Hook by default*. This would prevent potential issues where
>>>>> developers might unintentionally register numerous Shutdown Hooks when
>>>>> using ThreadPools.newWorkerPool for short-lived thread pools.
>>>>> To retain the existing functionality for long-lived thread pools that
>>>>> require a Shutdown Hook, I suggest introducing a new, more descriptive
>>>>> function, such as *newExitingWorkerPool*. This function would
>>>>> explicitly create thread pools that are registered with a Shutdown Hook.
>>>>>
>>>>> *This change might potentially impact users who rely on the implicit
>>>>> Shutdown Hook registration provided by the current
>>>>> ThreadPools.newWorkerPool implementation.*
>>>>> I would like to gather feedback from the community regarding this
>>>>> proposed change, especially regarding potential compatibility concerns.
>>>>>
>>>>> Best regards,
>>>>> Feng Jiajie
>>>>>
>>>>>

Reply via email to