This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch remote-partitioner in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c9af6a148b6af156bea74e05ee52b24d06c2d8f6 Author: YueZhang <[email protected]> AuthorDate: Tue May 6 11:53:39 2025 +0800 remote partitioner --- .../client/embedded/EmbeddedTimelineService.java | 4 + .../org/apache/hudi/config/HoodieIndexConfig.java | 5 ++ .../org/apache/hudi/config/HoodieWriteConfig.java | 4 + .../BucketIndexBulkInsertPartitionerWithRows.java | 21 ++++- .../apache/spark/sql/BucketPartitionUtils.scala | 34 +++++--- .../hudi/common/util/RemotePartitionHelper.java | 90 ++++++++++++++++++++++ .../DatasetBucketRescaleCommitActionExecutor.java | 2 +- .../hudi/functional/TestBucketIndexSupport.scala | 41 +++++++++- .../hudi/timeline/service/RequestHandler.java | 20 +++++ .../hudi/timeline/service/TimelineService.java | 10 +++ .../service/handlers/RemotePartitionerHandler.java | 59 ++++++++++++++ 11 files changed, 275 insertions(+), 15 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index a28b2235e3a..de306e8130b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -170,6 +170,10 @@ public class EmbeddedTimelineService { * writeConfig.getHoodieClientHeartbeatTolerableMisses()); } + if (writeConfig.isUsingRemotePartitioner()) { + timelineServiceConfBuilder.enableRemotePartitioner(true); + } + if (writeConfig.isTimelineServerBasedInstantStateEnabled()) { timelineServiceConfBuilder .instantStateForceRefreshRequestNumber(writeConfig.getTimelineServerBasedInstantStateForceRefreshRequestNumber()) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 6458c02daaa..d6a13b81c0f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -287,6 +287,11 @@ public class HoodieIndexConfig extends HoodieConfig { .withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, " + "and each partition is divided to N buckets."); + public static final ConfigProperty<Boolean> BUCKET_PARTITIONER = ConfigProperty + .key("hoodie.bucket.remote.partitioner.enable") + .defaultValue(true) + .withDocumentation("Use Remote Partitioner to do repartition based on bucket."); + public static final ConfigProperty<String> BUCKET_INDEX_PARTITION_RULE_TYPE = ConfigProperty .key("hoodie.bucket.index.partition.rule.type") .defaultValue(PartitionBucketIndexRule.REGEX.name) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 770f0535d00..19ce90bbb5d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2281,6 +2281,10 @@ public class HoodieWriteConfig extends HoodieConfig { public String getRecordIndexInputStorageLevel() { return getStringOrDefault(HoodieIndexConfig.RECORD_INDEX_INPUT_STORAGE_LEVEL_VALUE); } + + public boolean isUsingRemotePartitioner() { + return getBoolean(HoodieIndexConfig.BUCKET_PARTITIONER); + } /** * storage properties. diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java index db7a64de69f..9816febb041 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java @@ -19,10 +19,12 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.bucket.partition.NumBucketsFunction; import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.Partitioner; import org.apache.spark.sql.BucketPartitionUtils$; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -34,21 +36,34 @@ public class BucketIndexBulkInsertPartitionerWithRows implements BulkInsertParti private final String indexKeyFields; private final NumBucketsFunction numBucketsFunction; + private final HoodieWriteConfig writeConfig; + private FileSystemViewStorageConfig viewConfig; public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, HoodieWriteConfig writeConfig) { this.indexKeyFields = indexKeyFields; this.numBucketsFunction = NumBucketsFunction.fromWriteConfig(writeConfig); + this.writeConfig = writeConfig; + if (writeConfig.isUsingRemotePartitioner()) { + this.viewConfig = writeConfig.getViewStorageConfig(); + } } - public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, PartitionBucketIndexHashingConfig hashingConfig) { - this.indexKeyFields = indexKeyFields; + public BucketIndexBulkInsertPartitionerWithRows(HoodieWriteConfig writeConfig, PartitionBucketIndexHashingConfig hashingConfig) { + this.indexKeyFields = writeConfig.getBucketIndexHashFieldWithDefault(); this.numBucketsFunction = new NumBucketsFunction(hashingConfig.getExpressions(), hashingConfig.getRule(), hashingConfig.getDefaultBucketNumber()); + this.writeConfig = writeConfig; + if (writeConfig.isUsingRemotePartitioner()) { + this.viewConfig = writeConfig.getViewStorageConfig(); + } } @Override public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputPartitions) { - return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, numBucketsFunction, outputPartitions); + Partitioner partitioner = writeConfig.isUsingRemotePartitioner() && writeConfig.isEmbeddedTimelineServerEnabled() + ? BucketPartitionUtils$.MODULE$.getRemotePartitioner(viewConfig, numBucketsFunction, outputPartitions) + : BucketPartitionUtils$.MODULE$.getLocalePartitioner(numBucketsFunction, outputPartitions); + return BucketPartitionUtils$.MODULE$.createDataFrame(rows, indexKeyFields, numBucketsFunction, partitioner); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala index ecdf406871f..9e3784cf0ca 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.common.util.Functions +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig +import org.apache.hudi.common.util.{Functions, RemotePartitionHelper} import org.apache.hudi.common.util.hash.BucketIndexUtil import org.apache.hudi.index.bucket.BucketIdentifier import org.apache.hudi.index.bucket.partition.NumBucketsFunction @@ -28,7 +29,7 @@ import org.apache.spark.Partitioner import org.apache.spark.sql.catalyst.InternalRow object BucketPartitionUtils { - def createDataFrame(df: DataFrame, indexKeyFields: String, numBucketsFunction: NumBucketsFunction, partitionNum: Int): DataFrame = { + def createDataFrame(df: DataFrame, indexKeyFields: String, numBucketsFunction: NumBucketsFunction, partitioner: Partitioner): DataFrame = { def getPartitionKeyExtractor(): InternalRow => (String, Int) = row => { val partition = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD) val kb = BucketIdentifier @@ -42,8 +43,29 @@ object BucketPartitionUtils { } val getPartitionKey = getPartitionKeyExtractor() - val partitioner = new Partitioner { + // use internalRow to avoid extra convert. + val reRdd = df.queryExecution.toRdd + .keyBy(row => getPartitionKey(row)) + .repartitionAndSortWithinPartitions(partitioner) + .values + df.sparkSession.internalCreateDataFrame(reRdd, df.schema) + } + + def getRemotePartitioner(viewConf: FileSystemViewStorageConfig, numBucketsFunction: NumBucketsFunction, partitionNum: Int): Partitioner = { + new Partitioner { + private val helper = new RemotePartitionHelper(viewConf) + + override def numPartitions: Int = partitionNum + + override def getPartition(value: Any): Int = { + val partitionKeyPair = value.asInstanceOf[(String, Int)] + helper.getPartition(numBucketsFunction.getNumBuckets(partitionKeyPair._1), partitionKeyPair._1, partitionKeyPair._2, partitionNum) + } + } + } + def getLocalePartitioner(numBucketsFunction: NumBucketsFunction, partitionNum: Int): Partitioner = { + new Partitioner { private val partitionIndexFunc: Functions.Function3[Integer, String, Integer, Integer] = BucketIndexUtil.getPartitionIndexFunc(partitionNum) @@ -54,11 +76,5 @@ object BucketPartitionUtils { partitionIndexFunc.apply(numBucketsFunction.getNumBuckets(partitionKeyPair._1), partitionKeyPair._1, partitionKeyPair._2) } } - // use internalRow to avoid extra convert. - val reRdd = df.queryExecution.toRdd - .keyBy(row => getPartitionKey(row)) - .repartitionAndSortWithinPartitions(partitioner) - .values - df.sparkSession.internalCreateDataFrame(reRdd, df.schema) } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java new file mode 100644 index 00000000000..8bce5bb01f3 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RemotePartitionHelper.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.Consts; +import org.apache.http.client.fluent.Request; +import org.apache.http.client.fluent.Response; +import org.apache.http.client.utils.URIBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.HashMap; + +public class RemotePartitionHelper implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(RemotePartitionHelper.class); + + public static final String URL = "/v1/hoodie/partitioner/getpartitionindex"; + public static final String NUM_BUCKETS_PARAM = "numbuckets"; + public static final String PARTITION_PATH_PARAM = "partitionpath"; + public static final String CUR_BUCKET_PARAM = "curbucket"; + public static final String PARTITION_NUM_PARAM = "partitionnum"; + private final RetryHelper retryHelper; + private final String serverHost; + private final Integer serverPort; + private final ObjectMapper mapper; + private final int timeoutMs; + private final HashMap<String, Integer> cache; // dataPartition -> sparkPartitionIndex + public RemotePartitionHelper(FileSystemViewStorageConfig viewConf) { + this.retryHelper = new RetryHelper( + viewConf.getRemoteTimelineClientMaxRetryIntervalMs(), + viewConf.getRemoteTimelineClientMaxRetryNumbers(), + viewConf.getRemoteTimelineInitialRetryIntervalMs(), + viewConf.getRemoteTimelineClientRetryExceptions(), + "Sending request"); + this.serverHost = viewConf.getRemoteViewServerHost(); + this.serverPort = viewConf.getRemoteViewServerPort(); + this.timeoutMs = viewConf.getRemoteTimelineClientTimeoutSecs() * 1000; + this.mapper = new ObjectMapper(); + this.cache = new HashMap<>(); + } + + public int getPartition(int numBuckets, String partitionPath, int curBucket, int partitionNum) throws Exception { + if (cache.containsKey(partitionPath)) { + return computeActualPartition(cache.get(partitionPath), curBucket, partitionNum); + } + URIBuilder builder = + new URIBuilder().setHost(serverHost).setPort(serverPort).setPath(URL).setScheme("http"); + + builder.addParameter(NUM_BUCKETS_PARAM, String.valueOf(numBuckets)); + builder.addParameter(PARTITION_PATH_PARAM, partitionPath); + builder.addParameter(CUR_BUCKET_PARAM, String.valueOf(curBucket)); + builder.addParameter(PARTITION_NUM_PARAM, String.valueOf(partitionNum)); + + String url = builder.toString(); + LOG.info("Sending request : (" + url + ")"); + Response response = (Response)(retryHelper != null ? retryHelper.start(() -> Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute()) + : Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute()); + String content = response.returnContent().asString(Consts.UTF_8); + int partitionIndex = Integer.parseInt(mapper.readValue(content, new TypeReference<String>() {})); + cache.put(partitionPath, partitionIndex); + return computeActualPartition(partitionIndex, curBucket, partitionNum); + } + + private int computeActualPartition(int startOffset, int curBucket, int partitionNum) { + int res = startOffset + curBucket; + return res >= partitionNum ? res % partitionNum : res; + } + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java index ab623643a62..0e1621e0f19 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java @@ -59,7 +59,7 @@ public class DatasetBucketRescaleCommitActionExecutor extends DatasetBulkInsertO */ @Override protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean populateMetaFields, boolean isTablePartitioned) { - return new BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(), hashingConfig); + return new BucketIndexBulkInsertPartitionerWithRows(writeConfig, hashingConfig); } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala index 67915f9a588..783fca28f4f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBucketIndexSupport.scala @@ -22,21 +22,25 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.HoodieTableConfig -import org.apache.hudi.config.HoodieIndexConfig +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig +import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.index.HoodieIndex import org.apache.hudi.index.bucket.BucketIdentifier +import org.apache.hudi.index.bucket.partition.NumBucketsFunction import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.testutils.HoodieSparkClientTestBase import org.apache.avro.Schema import org.apache.avro.generic.GenericData -import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession} +import org.apache.spark.sql.{BucketPartitionUtils, HoodieCatalystExpressionUtils, SparkSession} import org.apache.spark.sql.catalyst.encoders.DummyExpressionHolder import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.types._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} +import java.util + @Tag("functional") class TestBucketIndexSupport extends HoodieSparkClientTestBase with PredicateHelper { @@ -78,6 +82,39 @@ class TestBucketIndexSupport extends HoodieSparkClientTestBase with PredicateHel cleanupSparkContexts() } + @Test + def testBucketIndexRemotePartitioner(): Unit = { + initTimelineService() + timelineService.startService() + val numBuckets = 511 + val config = HoodieWriteConfig.newBuilder.withPath(basePath).withIndexConfig(HoodieIndexConfig.newBuilder().withBucketNum(numBuckets.toString).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig + .newBuilder.withRemoteServerPort(timelineService.getServerPort).build).build + val numBucketsFunction = NumBucketsFunction.fromWriteConfig(config) + val partitionNum = 1533 + val partitioner = BucketPartitionUtils.getRemotePartitioner(config.getViewStorageConfig, numBucketsFunction, partitionNum) + val dataPartitions = List("dt=20250501", "dt=20250502", "dt=20250503", "dt=20250504", "dt=20250505", "dt=20250506") + val res = new util.HashMap[Int, Int] + dataPartitions.foreach(dataPartition => { + for (i <- 1 to numBuckets) { + // for bucket id from 1 to numBuckets + // mock 1000 spark partitions + val sparkPartition = partitioner.getPartition((dataPartition, i)) + if (res.containsKey(sparkPartition)) { + val value = res.get(sparkPartition) + 1 + res.put(sparkPartition, value) + } else { + res.put(sparkPartition, 1) + } + } + }) + timelineService.close() + + res.values().stream().forEach(value => { + assert(value == (numBuckets*dataPartitions.size/partitionNum)) + }) + } + @Test def testSingleHashFieldsExpression: Unit = { val bucketNumber = 19 diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 626c3836198..afe51f2f0ea 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.RemotePartitionHelper; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.storage.StorageConfiguration; @@ -43,6 +44,7 @@ import org.apache.hudi.timeline.service.handlers.BaseFileHandler; import org.apache.hudi.timeline.service.handlers.FileSliceHandler; import org.apache.hudi.timeline.service.handlers.InstantStateHandler; import org.apache.hudi.timeline.service.handlers.MarkerHandler; +import org.apache.hudi.timeline.service.handlers.RemotePartitionerHandler; import org.apache.hudi.timeline.service.handlers.TimelineHandler; import com.fasterxml.jackson.core.JsonProcessingException; @@ -86,6 +88,7 @@ public class RequestHandler { private final BaseFileHandler dataFileHandler; private final MarkerHandler markerHandler; private final InstantStateHandler instantStateHandler; + private RemotePartitionerHandler partitionerHandler; private final Registry metricsRegistry = Registry.getRegistry("TimelineService"); private final ScheduledExecutorService asyncResultService; @@ -104,6 +107,9 @@ public class RequestHandler { } else { this.markerHandler = null; } + if (timelineServiceConfig.enableRemotePartitioner) { + this.partitionerHandler = new RemotePartitionerHandler(conf, timelineServiceConfig, viewManager); + } if (timelineServiceConfig.enableInstantStateRequests) { this.instantStateHandler = new InstantStateHandler(conf, timelineServiceConfig, viewManager); } else { @@ -194,6 +200,9 @@ public class RequestHandler { if (markerHandler != null) { registerMarkerAPI(); } + if (partitionerHandler != null) { + registerRemotePartitionerAPI(); + } if (instantStateHandler != null) { registerInstantStateAPI(); } @@ -559,6 +568,17 @@ public class RequestHandler { }, false)); } + private void registerRemotePartitionerAPI() { + app.get(RemotePartitionHelper.URL, new ViewHandler(ctx -> { + int partition = partitionerHandler.gePartitionIndex( + ctx.queryParamAsClass(RemotePartitionHelper.NUM_BUCKETS_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemotePartitionHelper.PARTITION_PATH_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemotePartitionHelper.CUR_BUCKET_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemotePartitionHelper.PARTITION_NUM_PARAM, String.class).getOrDefault("")); + writeValueAsString(ctx, partition); + }, false)); + } + /** * Used for logging and performing refresh check. */ diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index e9300fc0dd9..3843e0cf97c 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -111,6 +111,9 @@ public class TimelineService { @Parameter(names = {"--enable-marker-requests", "-em"}, description = "Enable handling of marker-related requests") public boolean enableMarkerRequests = false; + @Parameter(names = {"--enable-remote-partitioner"}, description = "Enable remote partitioner") + public boolean enableRemotePartitioner = true; + @Parameter(names = {"--enable-instant-state-requests"}, description = "Enable handling of instant state requests") public boolean enableInstantStateRequests = false; @@ -195,6 +198,7 @@ public class TimelineService { private Long maxAllowableHeartbeatIntervalInMs = 120000L; private int instantStateForceRefreshRequestNumber = 100; + private boolean enableRemotePartitioner = true; public Builder() { } @@ -249,6 +253,11 @@ public class TimelineService { return this; } + public Builder enableRemotePartitioner(boolean enableRemotePartitioner) { + this.enableRemotePartitioner = enableRemotePartitioner; + return this; + } + public Builder markerBatchNumThreads(int markerBatchNumThreads) { this.markerBatchNumThreads = markerBatchNumThreads; return this; @@ -316,6 +325,7 @@ public class TimelineService { config.async = this.async; config.compress = this.compress; config.enableMarkerRequests = this.enableMarkerRequests; + config.enableRemotePartitioner = this.enableRemotePartitioner; config.markerBatchNumThreads = this.markerBatchNumThreads; config.markerBatchIntervalMs = this.markerBatchIntervalMs; config.markerParallelism = this.markerParallelism; diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java new file mode 100644 index 00000000000..5abc03fde3f --- /dev/null +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/RemotePartitionerHandler.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.timeline.service.handlers; + +import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.timeline.service.TimelineService; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class RemotePartitionerHandler extends Handler { + + // cache Map<PartitionPath, BucketStartIndex> + private final ConcurrentHashMap<String, Integer> cache; + private final AtomicInteger nextIndex = new AtomicInteger(0); + + public RemotePartitionerHandler(StorageConfiguration<?> conf, TimelineService.Config timelineServiceConfig, + FileSystemViewManager viewManager) { + super(conf, timelineServiceConfig, viewManager); + this.cache = new ConcurrentHashMap<>(); + } + + public int gePartitionIndex(String numBuckets, String partitionPath, String curBucket, String partitionNum) { + int num = Integer.parseInt(numBuckets); + int partNum = Integer.parseInt(partitionNum); + + return cache.computeIfAbsent(partitionPath, key -> { + int current; + int newNext; + int res; + do { + current = nextIndex.get(); + res = current; + newNext = current + num; + if (newNext >= partNum) { + newNext = newNext % partNum; + } + } while (!nextIndex.compareAndSet(current, newNext)); + return res; + }); + } +}
