This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3bf9c5f [HUDI-3728] Set the sort operator parallelism for flink
bucket bulk insert (#5154)
3bf9c5f is described below
commit 3bf9c5ffe80e84e9b0ba34e115e1a7a417f343e8
Author: Danny Chan <[email protected]>
AuthorDate: Tue Mar 29 09:52:35 2022 +0800
[HUDI-3728] Set the sort operator parallelism for flink bucket bulk insert
(#5154)
---
.../sink/bucket/BucketStreamWriteFunction.java | 67 +++++++++-------------
.../java/org/apache/hudi/sink/utils/Pipelines.java | 6 +-
2 files changed, 30 insertions(+), 43 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 7e4cf68..1456e88 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -18,16 +18,12 @@
package org.apache.hudi.sink.bucket;
-import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.sink.StreamWriteFunction;
-import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -39,12 +35,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import static java.util.stream.Collectors.toList;
-
/**
* A stream write function with bucket hash index.
*
@@ -58,18 +51,14 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
private static final Logger LOG =
LoggerFactory.getLogger(BucketStreamWriteFunction.class);
- private int maxParallelism;
-
private int parallelism;
private int bucketNum;
- private transient HoodieFlinkTable<?> table;
-
private String indexKeyFields;
/**
- * BucketID should be load in this task.
+ * BucketID should be loaded in this task.
*/
private Set<Integer> bucketToLoad;
@@ -87,6 +76,11 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
private Set<String> incBucketIndex;
/**
+ * Returns whether this is an empty table.
+ */
+ private boolean isEmptyTable;
+
+ /**
* Constructs a BucketStreamWriteFunction.
*
* @param config The config options
@@ -102,17 +96,15 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD);
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
- this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
- this.bucketToLoad = new HashSet<>();
+ this.bucketToLoad = getBucketToLoad();
this.bucketIndex = new HashMap<>();
this.incBucketIndex = new HashSet<>();
- getBucketToLoad();
+ this.isEmptyTable =
!this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent();
}
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
super.initializeState(context);
- this.table = this.writeClient.getHoodieTable();
}
@Override
@@ -129,19 +121,19 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
final HoodieRecordLocation location;
bootstrapIndexIfNeed(partition);
- Map<Integer, String> bucketToFileIdMap = bucketIndex.get(partition);
+ Map<Integer, String> bucketToFileId =
bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
final int bucketNum = BucketIdentifier.getBucketId(hoodieKey,
indexKeyFields, this.bucketNum);
- final String partitionBucketId =
BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum);
+ final String bucketId = partition + bucketNum;
- if (incBucketIndex.contains(partitionBucketId)) {
- location = new HoodieRecordLocation("I",
bucketToFileIdMap.get(bucketNum));
- } else if (bucketToFileIdMap.containsKey(bucketNum)) {
- location = new HoodieRecordLocation("U",
bucketToFileIdMap.get(bucketNum));
+ if (incBucketIndex.contains(bucketId)) {
+ location = new HoodieRecordLocation("I", bucketToFileId.get(bucketNum));
+ } else if (bucketToFileId.containsKey(bucketNum)) {
+ location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
} else {
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
location = new HoodieRecordLocation("I", newFileId);
- bucketToFileIdMap.put(bucketNum,newFileId);
- incBucketIndex.add(partitionBucketId);
+ bucketToFileId.put(bucketNum, newFileId);
+ incBucketIndex.add(bucketId);
}
record.unseal();
record.setCurrentLocation(location);
@@ -153,39 +145,32 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
* Bootstrap bucket info from existing file system,
* bucketNum % totalParallelism == this taskID belongs to this task.
*/
- private void getBucketToLoad() {
+ private Set<Integer> getBucketToLoad() {
+ Set<Integer> bucketToLoad = new HashSet<>();
for (int i = 0; i < bucketNum; i++) {
int partitionOfBucket = BucketIdentifier.mod(i, parallelism);
if (partitionOfBucket == taskID) {
- LOG.info(String.format("Bootstrapping index. Adding bucket %s , "
- + "Current parallelism: %s , Max parallelism: %s , Current
task id: %s",
- i, parallelism, maxParallelism, taskID));
bucketToLoad.add(i);
}
}
- bucketToLoad.forEach(bucket -> LOG.info(String.format("bucketToLoad
contains %s", bucket)));
+ LOG.info("Bucket number that belongs to task [{}/{}]: {}", taskID,
parallelism, bucketToLoad);
+ return bucketToLoad;
}
/**
* Get partition_bucket -> fileID mapping from the existing hudi table.
* This is a required operation for each restart to avoid having duplicate
file ids for one bucket.
*/
- private void bootstrapIndexIfNeed(String partition) throws IOException {
- if (bucketIndex.containsKey(partition)) {
- return;
- }
- Option<HoodieInstant> latestCommitTime =
table.getHoodieView().getTimeline().filterCompletedInstants().lastInstant();
- if (!latestCommitTime.isPresent()) {
- bucketIndex.put(partition, new HashMap<>());
+ private void bootstrapIndexIfNeed(String partition) {
+ if (isEmptyTable || bucketIndex.containsKey(partition)) {
return;
}
- LOG.info(String.format("Loading Hoodie Table %s, with path %s",
table.getMetaClient().getTableConfig().getTableName(),
- table.getMetaClient().getBasePath() + "/" + partition));
+ LOG.info(String.format("Loading Hoodie Table %s, with path %s",
this.metaClient.getTableConfig().getTableName(),
+ this.metaClient.getBasePath() + "/" + partition));
// Load existing fileID belongs to this task
Map<Integer, String> bucketToFileIDMap = new HashMap<>();
- List<FileSlice> fileSlices =
table.getHoodieView().getLatestFileSlices(partition).collect(toList());
- for (FileSlice fileSlice : fileSlices) {
+
this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice
-> {
String fileID = fileSlice.getFileId();
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
if (bucketToLoad.contains(bucketNumber)) {
@@ -198,7 +183,7 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
bucketToFileIDMap.put(bucketNumber, fileID);
}
}
- }
+ });
bucketIndex.put(partition, bucketToFileIDMap);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 1992edd..9f0a817 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -98,10 +98,12 @@ public class Pipelines {
InternalTypeInfo<RowData> typeInfo =
InternalTypeInfo.of(rowTypeWithFileId);
dataStream = dataStream.partitionCustom(partitioner,
rowDataKeyGen::getRecordKey)
.map(record ->
BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record,
indexKeyFields, bucketNum),
- typeInfo);
+ typeInfo)
+ .setParallelism(dataStream.getParallelism()); // same parallelism as
source
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
SortOperatorGen sortOperatorGen =
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
- dataStream = dataStream.transform("file_sorter", typeInfo,
sortOperatorGen.createSortOperator());
+ dataStream = dataStream.transform("file_sorter", typeInfo,
sortOperatorGen.createSortOperator())
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); //
same parallelism as write task to avoid shuffle
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}