This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch remote-partitioner
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c9af6a148b6af156bea74e05ee52b24d06c2d8f6
Author: YueZhang <[email protected]>
AuthorDate: Tue May 6 11:53:39 2025 +0800

    remote partitioner
---
 .../client/embedded/EmbeddedTimelineService.java   |  4 +
 .../org/apache/hudi/config/HoodieIndexConfig.java  |  5 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 +
 .../BucketIndexBulkInsertPartitionerWithRows.java  | 21 ++++-
 .../apache/spark/sql/BucketPartitionUtils.scala    | 34 +++++---
 .../hudi/common/util/RemotePartitionHelper.java    | 90 ++++++++++++++++++++++
 .../DatasetBucketRescaleCommitActionExecutor.java  |  2 +-
 .../hudi/functional/TestBucketIndexSupport.scala   | 41 +++++++++-
 .../hudi/timeline/service/RequestHandler.java      | 20 +++++
 .../hudi/timeline/service/TimelineService.java     | 10 +++
 .../service/handlers/RemotePartitionerHandler.java | 59 ++++++++++++++
 11 files changed, 275 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index a28b2235e3a..de306e8130b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -170,6 +170,10 @@ public class EmbeddedTimelineService {
                   * writeConfig.getHoodieClientHeartbeatTolerableMisses());
     }
 
+    if (writeConfig.isUsingRemotePartitioner()) {
+      timelineServiceConfBuilder.enableRemotePartitioner(true);
+    }
+
     if (writeConfig.isTimelineServerBasedInstantStateEnabled()) {
       timelineServiceConfBuilder
           
.instantStateForceRefreshRequestNumber(writeConfig.getTimelineServerBasedInstantStateForceRefreshRequestNumber())
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 6458c02daaa..d6a13b81c0f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -287,6 +287,11 @@ public class HoodieIndexConfig extends HoodieConfig {
       .withDocumentation("Only applies if index type is BUCKET. Determine the 
number of buckets in the hudi table, "
           + "and each partition is divided to N buckets.");
 
+  public static final ConfigProperty<Boolean> BUCKET_PARTITIONER = 
ConfigProperty
+      .key("hoodie.bucket.remote.partitioner.enable")
+      .defaultValue(true)
+      .withDocumentation("Use Remote Partitioner to do repartition based on 
bucket.");
+
   public static final ConfigProperty<String> BUCKET_INDEX_PARTITION_RULE_TYPE 
= ConfigProperty
       .key("hoodie.bucket.index.partition.rule.type")
       .defaultValue(PartitionBucketIndexRule.REGEX.name)
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 770f0535d00..19ce90bbb5d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2281,6 +2281,10 @@ public class HoodieWriteConfig extends HoodieConfig {
   public String getRecordIndexInputStorageLevel() {
     return 
getStringOrDefault(HoodieIndexConfig.RECORD_INDEX_INPUT_STORAGE_LEVEL_VALUE);
   }
+  
+  public boolean isUsingRemotePartitioner() {
+    return getBoolean(HoodieIndexConfig.BUCKET_PARTITIONER);
+  }
 
   /**
    * storage properties.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
index db7a64de69f..9816febb041 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
@@ -19,10 +19,12 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
+import org.apache.spark.Partitioner;
 import org.apache.spark.sql.BucketPartitionUtils$;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -34,21 +36,34 @@ public class BucketIndexBulkInsertPartitionerWithRows 
implements BulkInsertParti
 
   private final String indexKeyFields;
   private final NumBucketsFunction numBucketsFunction;
+  private final HoodieWriteConfig writeConfig;
+  private FileSystemViewStorageConfig viewConfig;
 
   public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, 
HoodieWriteConfig writeConfig) {
     this.indexKeyFields = indexKeyFields;
     this.numBucketsFunction = NumBucketsFunction.fromWriteConfig(writeConfig);
+    this.writeConfig = writeConfig;
+    if (writeConfig.isUsingRemotePartitioner()) {
+      this.viewConfig = writeConfig.getViewStorageConfig();
+    }
   }
 
-  public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, 
PartitionBucketIndexHashingConfig hashingConfig) {
-    this.indexKeyFields = indexKeyFields;
+  public BucketIndexBulkInsertPartitionerWithRows(HoodieWriteConfig 
writeConfig, PartitionBucketIndexHashingConfig hashingConfig) {
+    this.indexKeyFields = writeConfig.getBucketIndexHashFieldWithDefault();
     this.numBucketsFunction = new 
NumBucketsFunction(hashingConfig.getExpressions(),
         hashingConfig.getRule(), hashingConfig.getDefaultBucketNumber());
+    this.writeConfig = writeConfig;
+    if (writeConfig.isUsingRemotePartitioner()) {
+      this.viewConfig = writeConfig.getViewStorageConfig();
+    }
   }
 
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputPartitions) {
-    return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, 
numBucketsFunction, outputPartitions);
+    Partitioner partitioner = writeConfig.isUsingRemotePartitioner() && 
writeConfig.isEmbeddedTimelineServerEnabled()
+        ? BucketPartitionUtils$.MODULE$.getRemotePartitioner(viewConfig, 
numBucketsFunction, outputPartitions) 
+        : 
BucketPartitionUtils$.MODULE$.getLocalePartitioner(numBucketsFunction, 
outputPartitions);
+    return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, 
numBucketsFunction, partitioner);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
index ecdf406871f..9e3784cf0ca 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
@@ -19,7 +19,8 @@
 package org.apache.spark.sql
 
 import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.util.Functions
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
+import org.apache.hudi.common.util.{Functions, RemotePartitionHelper}
 import org.apache.hudi.common.util.hash.BucketIndexUtil
 import org.apache.hudi.index.bucket.BucketIdentifier
 import org.apache.hudi.index.bucket.partition.NumBucketsFunction
@@ -28,7 +29,7 @@ import org.apache.spark.Partitioner
 import org.apache.spark.sql.catalyst.InternalRow
 
 object BucketPartitionUtils {
-  def createDataFrame(df: DataFrame, indexKeyFields: String, 
numBucketsFunction: NumBucketsFunction, partitionNum: Int): DataFrame = {
+  def createDataFrame(df: DataFrame, indexKeyFields: String, 
numBucketsFunction: NumBucketsFunction, partitioner: Partitioner): DataFrame = {
     def getPartitionKeyExtractor(): InternalRow => (String, Int) = row => {
       val partition = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
       val kb = BucketIdentifier
@@ -42,8 +43,29 @@ object BucketPartitionUtils {
     }
 
     val getPartitionKey = getPartitionKeyExtractor()
-    val partitioner = new Partitioner {
+    // use internalRow to avoid extra convert.
+    val reRdd = df.queryExecution.toRdd
+      .keyBy(row => getPartitionKey(row))
+      .repartitionAndSortWithinPartitions(partitioner)
+      .values
+    df.sparkSession.internalCreateDataFrame(reRdd, df.schema)
+  }
+
+  def getRemotePartitioner(viewConf: FileSystemViewStorageConfig, 
numBucketsFunction: NumBucketsFunction, partitionNum: Int): Partitioner = {
+    new Partitioner {
+      private val helper = new RemotePartitionHelper(viewConf)
+
+      override def numPartitions: Int = partitionNum
+
+      override def getPartition(value: Any): Int = {
+        val partitionKeyPair = value.asInstanceOf[(String, Int)]
+        
helper.getPartition(numBucketsFunction.getNumBuckets(partitionKeyPair._1), 
partitionKeyPair._1, partitionKeyPair._2, partitionNum)
+      }
+    }
+  }
 
+  def getLocalePartitioner(numBucketsFunction: NumBucketsFunction, 
partitionNum: Int): Partitioner = {
+    new Partitioner {
       private val partitionIndexFunc: Functions.Function3[Integer, String, 
Integer, Integer] =
         BucketIndexUtil.getPartitionIndexFunc(partitionNum)
 
@@ -54,11 +76,5 @@ object BucketPartitionUtils {
         
partitionIndexFunc.apply(numBucketsFunction.getNumBuckets(partitionKeyPair._1), 
partitionKeyPair._1, partitionKeyPair._2)
       }
     }
-    // use internalRow to avoid extra convert.
-    val reRdd = df.queryExecution.toRdd
-      .keyBy(row => getPartitionKey(row))
-      .repartitionAndSortWithinPartitions(partitioner)
-      .values
-    df.sparkSession.internalCreateDataFrame(reRdd, df.schema)
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java
new file mode 100644
index 00000000000..8bce5bb01f3
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.Consts;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class RemotePartitionHelper implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemotePartitionHelper.class);
+
+  public static final String URL = "/v1/hoodie/partitioner/getpartitionindex";
+  public static final String NUM_BUCKETS_PARAM = "numbuckets";
+  public static final String PARTITION_PATH_PARAM = "partitionpath";
+  public static final String CUR_BUCKET_PARAM = "curbucket";
+  public static final String PARTITION_NUM_PARAM = "partitionnum";
+  private final RetryHelper retryHelper;
+  private final String serverHost;
+  private final Integer serverPort;
+  private final ObjectMapper mapper;
+  private final int timeoutMs;
+  private final HashMap<String, Integer> cache; // dataPartition -> 
sparkPartitionIndex
+  public RemotePartitionHelper(FileSystemViewStorageConfig viewConf) {
+    this.retryHelper = new RetryHelper(
+        viewConf.getRemoteTimelineClientMaxRetryIntervalMs(),
+        viewConf.getRemoteTimelineClientMaxRetryNumbers(),
+        viewConf.getRemoteTimelineInitialRetryIntervalMs(),
+        viewConf.getRemoteTimelineClientRetryExceptions(),
+        "Sending request");
+    this.serverHost = viewConf.getRemoteViewServerHost();
+    this.serverPort = viewConf.getRemoteViewServerPort();
+    this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000;
+    this.mapper = new ObjectMapper();
+    this.cache = new HashMap<>();
+  }
+
+  public int getPartition(int numBuckets, String partitionPath, int curBucket, 
int partitionNum) throws Exception {
+    if (cache.containsKey(partitionPath)) {
+      return computeActualPartition(cache.get(partitionPath), curBucket, 
partitionNum);
+    }
+    URIBuilder builder =
+        new 
URIBuilder().setHost(serverHost).setPort(serverPort).setPath(URL).setScheme("http");
+
+    builder.addParameter(NUM_BUCKETS_PARAM, String.valueOf(numBuckets));
+    builder.addParameter(PARTITION_PATH_PARAM, partitionPath);
+    builder.addParameter(CUR_BUCKET_PARAM, String.valueOf(curBucket));
+    builder.addParameter(PARTITION_NUM_PARAM, String.valueOf(partitionNum));
+
+    String url = builder.toString();
+    LOG.info("Sending request : (" + url + ")");
+    Response response = (Response)(retryHelper != null ? retryHelper.start(() 
-> 
Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute())
+        : 
Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute());
+    String content = response.returnContent().asString(Consts.UTF_8);
+    int partitionIndex = Integer.parseInt(mapper.readValue(content, new 
TypeReference<String>() {}));
+    cache.put(partitionPath, partitionIndex);
+    return computeActualPartition(partitionIndex, curBucket, partitionNum);
+  }
+
+  private int computeActualPartition(int startOffset, int curBucket, int 
partitionNum) {
+    int res = startOffset + curBucket;
+    return res >= partitionNum ? res % partitionNum : res;
+  }
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
index ab623643a62..0e1621e0f19 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
@@ -59,7 +59,7 @@ public class DatasetBucketRescaleCommitActionExecutor extends 
DatasetBulkInsertO
    */
   @Override
   protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean 
populateMetaFields, boolean isTablePartitioned) {
-    return new 
BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(),
 hashingConfig);
+    return new BucketIndexBulkInsertPartitionerWithRows(writeConfig, 
hashingConfig);
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
index 67915f9a588..783fca28f4f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala
@@ -22,21 +22,25 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, 
TypedProperties}
 import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.config.HoodieIndexConfig
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.index.HoodieIndex
 import org.apache.hudi.index.bucket.BucketIdentifier
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction
 import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator}
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
-import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession}
+import org.apache.spark.sql.{BucketPartitionUtils, 
HoodieCatalystExpressionUtils, SparkSession}
 import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
 
+import java.util
+
 @Tag("functional")
 class TestBucketIndexSupport extends HoodieSparkClientTestBase with 
PredicateHelper {
 
@@ -78,6 +82,39 @@ class TestBucketIndexSupport extends 
HoodieSparkClientTestBase with PredicateHel
     cleanupSparkContexts()
   }
 
+  @Test
+  def testBucketIndexRemotePartitioner(): Unit = {
+    initTimelineService()
+    timelineService.startService()
+    val numBuckets = 511
+    val config = 
HoodieWriteConfig.newBuilder.withPath(basePath).withIndexConfig(HoodieIndexConfig.newBuilder().withBucketNum(numBuckets.toString).build())
+      .withFileSystemViewConfig(FileSystemViewStorageConfig
+        
.newBuilder.withRemoteServerPort(timelineService.getServerPort).build).build
+    val numBucketsFunction = NumBucketsFunction.fromWriteConfig(config)
+    val partitionNum = 1533
+    val partitioner = 
BucketPartitionUtils.getRemotePartitioner(config.getViewStorageConfig, 
numBucketsFunction, partitionNum)
+    val dataPartitions = List("dt=20250501", "dt=20250502", "dt=20250503", 
"dt=20250504", "dt=20250505", "dt=20250506")
+    val res = new util.HashMap[Int, Int]
+    dataPartitions.foreach(dataPartition => {
+      for (i <- 1 to numBuckets) {
+        // for bucket id from 1 to numBuckets
+        // mock 1000 spark partitions
+        val sparkPartition = partitioner.getPartition((dataPartition, i))
+        if (res.containsKey(sparkPartition)) {
+          val value = res.get(sparkPartition) + 1
+          res.put(sparkPartition, value)
+        } else {
+          res.put(sparkPartition, 1)
+        }
+      }
+    })
+    timelineService.close()
+
+    res.values().stream().forEach(value => {
+      assert(value == (numBuckets*dataPartitions.size/partitionNum))
+    })
+  }
+
   @Test
   def testSingleHashFieldsExpression: Unit = {
     val bucketNumber = 19
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 626c3836198..afe51f2f0ea 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -36,6 +36,7 @@ import 
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.RemotePartitionHelper;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -43,6 +44,7 @@ import 
org.apache.hudi.timeline.service.handlers.BaseFileHandler;
 import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
 import org.apache.hudi.timeline.service.handlers.InstantStateHandler;
 import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.hudi.timeline.service.handlers.RemotePartitionerHandler;
 import org.apache.hudi.timeline.service.handlers.TimelineHandler;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -86,6 +88,7 @@ public class RequestHandler {
   private final BaseFileHandler dataFileHandler;
   private final MarkerHandler markerHandler;
   private final InstantStateHandler instantStateHandler;
+  private RemotePartitionerHandler partitionerHandler;
   private final Registry metricsRegistry = 
Registry.getRegistry("TimelineService");
   private final ScheduledExecutorService asyncResultService;
 
@@ -104,6 +107,9 @@ public class RequestHandler {
     } else {
       this.markerHandler = null;
     }
+    if (timelineServiceConfig.enableRemotePartitioner) {
+      this.partitionerHandler = new RemotePartitionerHandler(conf, 
timelineServiceConfig, viewManager);
+    }
     if (timelineServiceConfig.enableInstantStateRequests) {
       this.instantStateHandler = new InstantStateHandler(conf, 
timelineServiceConfig, viewManager);
     } else {
@@ -194,6 +200,9 @@ public class RequestHandler {
     if (markerHandler != null) {
       registerMarkerAPI();
     }
+    if (partitionerHandler != null) {
+      registerRemotePartitionerAPI();
+    }
     if (instantStateHandler != null) {
       registerInstantStateAPI();
     }
@@ -559,6 +568,17 @@ public class RequestHandler {
     }, false));
   }
 
+  private void registerRemotePartitionerAPI() {
+    app.get(RemotePartitionHelper.URL, new ViewHandler(ctx -> {
+      int partition = partitionerHandler.gePartitionIndex(
+          ctx.queryParamAsClass(RemotePartitionHelper.NUM_BUCKETS_PARAM, 
String.class).getOrDefault(""),
+          ctx.queryParamAsClass(RemotePartitionHelper.PARTITION_PATH_PARAM, 
String.class).getOrDefault(""),
+          ctx.queryParamAsClass(RemotePartitionHelper.CUR_BUCKET_PARAM, 
String.class).getOrDefault(""),
+          ctx.queryParamAsClass(RemotePartitionHelper.PARTITION_NUM_PARAM, 
String.class).getOrDefault(""));
+      writeValueAsString(ctx, partition);
+    }, false));
+  }
+
   /**
    * Used for logging and performing refresh check.
    */
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
index e9300fc0dd9..3843e0cf97c 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
@@ -111,6 +111,9 @@ public class TimelineService {
     @Parameter(names = {"--enable-marker-requests", "-em"}, description = 
"Enable handling of marker-related requests")
     public boolean enableMarkerRequests = false;
 
+    @Parameter(names = {"--enable-remote-partitioner"}, description = "Enable 
remote partitioner")
+    public boolean enableRemotePartitioner = true;
+
     @Parameter(names = {"--enable-instant-state-requests"}, description = 
"Enable handling of instant state requests")
     public boolean enableInstantStateRequests = false;
 
@@ -195,6 +198,7 @@ public class TimelineService {
       private Long maxAllowableHeartbeatIntervalInMs = 120000L;
 
       private int instantStateForceRefreshRequestNumber = 100;
+      private boolean enableRemotePartitioner = true;
 
       public Builder() {
       }
@@ -249,6 +253,11 @@ public class TimelineService {
         return this;
       }
 
+      public Builder enableRemotePartitioner(boolean enableRemotePartitioner) {
+        this.enableRemotePartitioner = enableRemotePartitioner;
+        return this;
+      }
+
       public Builder markerBatchNumThreads(int markerBatchNumThreads) {
         this.markerBatchNumThreads = markerBatchNumThreads;
         return this;
@@ -316,6 +325,7 @@ public class TimelineService {
         config.async = this.async;
         config.compress = this.compress;
         config.enableMarkerRequests = this.enableMarkerRequests;
+        config.enableRemotePartitioner = this.enableRemotePartitioner;
         config.markerBatchNumThreads = this.markerBatchNumThreads;
         config.markerBatchIntervalMs = this.markerBatchIntervalMs;
         config.markerParallelism = this.markerParallelism;
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java
new file mode 100644
index 00000000000..5abc03fde3f
--- /dev/null
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers;
+
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.timeline.service.TimelineService;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RemotePartitionerHandler extends Handler {
+
+  // cache Map<PartitionPath, BucketStartIndex>
+  private final ConcurrentHashMap<String, Integer> cache;
+  private final AtomicInteger nextIndex = new AtomicInteger(0);
+
+  public RemotePartitionerHandler(StorageConfiguration<?> conf, 
TimelineService.Config timelineServiceConfig,
+                                  FileSystemViewManager viewManager) {
+    super(conf, timelineServiceConfig, viewManager);
+    this.cache = new ConcurrentHashMap<>();
+  }
+
+  public int gePartitionIndex(String numBuckets, String partitionPath, String 
curBucket, String partitionNum) {
+    int num = Integer.parseInt(numBuckets);
+    int partNum = Integer.parseInt(partitionNum);
+
+    return cache.computeIfAbsent(partitionPath, key -> {
+      int current;
+      int newNext;
+      int res;
+      do {
+        current = nextIndex.get();
+        res = current;
+        newNext = current + num;
+        if (newNext >= partNum) {
+          newNext = newNext % partNum;
+        }
+      } while (!nextIndex.compareAndSet(current, newNext));
+      return res;
+    });
+  }
+}

Reply via email to