This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 74cafba6e6c6 [HUDI-9795] Use dedicated ForkJoinPool for parallel 
execution in HoodieFlinkEngineContext (#13853)
74cafba6e6c6 is described below

commit 74cafba6e6c65d0ea25f84e84165f671d040df9e
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Sep 12 15:10:43 2025 +0800

    [HUDI-9795] Use dedicated ForkJoinPool for parallel execution in 
HoodieFlinkEngineContext (#13853)
---
 .../client/common/HoodieFlinkEngineContext.java    | 54 ++++++++++++++++------
 1 file changed, 41 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index aee7a2d904ca..8c45c9c35f9d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
@@ -54,6 +55,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -109,16 +112,19 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
 
   @Override
   public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int 
parallelism) {
-    return 
data.stream().parallel().map(throwingMapWrapper(func)).collect(Collectors.toList());
+    return executeParallelStream(data.parallelStream(), stream -> 
stream.map(throwingMapWrapper(func)).collect(Collectors.toList()), parallelism);
   }
 
   @Override
   public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, 
SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, 
V> reduceFunc, int parallelism) {
-    return 
data.stream().parallel().map(throwingMapToPairWrapper(mapToPairFunc))
-        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
-        .map(list -> list.stream().map(e -> 
e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
-        .filter(Objects::nonNull)
-        .collect(Collectors.toList());
+    return executeParallelStream(
+        data.parallelStream(),
+        stream -> stream.map(throwingMapToPairWrapper(mapToPairFunc))
+            .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
+            .map(list -> list.stream().map(e -> 
e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+            .filter(Objects::nonNull)
+            .collect(Collectors.toList()),
+        parallelism);
   }
 
   @Override
@@ -135,16 +141,21 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
   @Override
   public <I, K, V> List<V> reduceByKey(
       List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int 
parallelism) {
-    return data.stream().parallel()
-        .collect(Collectors.groupingBy(p -> p.getKey())).values().stream()
-        .map(list -> list.stream().map(e -> 
e.getValue()).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
-        .filter(Objects::nonNull)
-        .collect(Collectors.toList());
+    return executeParallelStream(
+        data.parallelStream(),
+        stream -> 
stream.collect(Collectors.groupingBy(Pair::getKey)).values().stream()
+            .map(list -> 
list.stream().map(Pair::getValue).reduce(throwingReduceWrapper(reduceFunc)).orElse(null))
+            .filter(Objects::nonNull)
+            .collect(Collectors.toList()),
+        parallelism);
   }
 
   @Override
   public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, 
Stream<O>> func, int parallelism) {
-    return 
data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
+    return executeParallelStream(
+        data.parallelStream(),
+        stream -> 
stream.flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList()),
+        parallelism);
   }
 
   @Override
@@ -154,7 +165,24 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
 
   @Override
   public <I, K, V> Map<K, V> mapToPair(List<I> data, 
SerializablePairFunction<I, K, V> func, Integer parallelism) {
-    return 
data.stream().parallel().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft,
 Pair::getRight));
+    return executeParallelStream(
+        data.parallelStream(),
+        stream -> 
stream.map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft,
 Pair::getRight)),
+        parallelism);
+  }
+
+  /**
+   * Execute a parallel stream with a dedicated ForkJoinPool.
+   */
+  private static <E, O> O executeParallelStream(Stream<E> paralelStream, 
Function<Stream<E>, O> transform, int parallelism) {
+    ForkJoinPool pool = new ForkJoinPool(parallelism);
+    try {
+      return pool.submit(() -> transform.apply(paralelStream)).get();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to execute parallel stream with 
dedicated ForkJoinPool.", e);
+    } finally {
+      pool.shutdown();
+    }
   }
 
   @Override

Reply via email to