QuChunhe opened a new issue, #6585:
URL: https://github.com/apache/hudi/issues/6585
Every row data is about 0.72K. Every insert commit with 300 rows takes about
2 minutes. But increasing the rows of a commit, the time of every commit did
not increase much. Event if each insert commit has 4500 rows, the time of
every commit take just over 2 minutes.
1. Hudi version: 0.12.0, aliyun oss file system, flink 1.13.6.2.
2. Spark beeline is used to create the table. The table properties are as
follows.
USING hudi
PARTITIONED BY (bizdate)
TBLPROPERTIES (
'primaryKey' = 'timestamp,serial_number,message_id',
'type' = 'mor',
'preCombineField' = 'timestamp',
'hoodie.datasource.write.hive_style_partitioning'='false',
'hoodie.database.name'='gs_ods',
'hoodie.table.base.file.format'='parquet',
'hoodie.parquet.writelegacyformat.enabled'='false'
);
3. Hudi Java client in a flink pipeline writes the data from a Kafka
cluster, and the configuration is as follows.
private String baseFileFormat = "parquet";
private String recordKeyFields;
private int parallelism = 20;
private WriteConcurrencyMode writeConcurrencyMode =
WriteConcurrencyMode.SINGLE_WRITER;
private HoodieLockConfig hoodieLockConfig =
HoodieLockConfig.newBuilder().build();
private HoodieFailedWritesCleaningPolicy hoodieFailedWritesCleaningPolicy
= HoodieFailedWritesCleaningPolicy.EAGER;
@Override
public void open(Configuration parameters) throws Exception {
****
// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
.withPath(tablePath)
.withSchema(schema)
.forTable(tableName)
.withAutoCommit(true)
.withTableServicesEnabled(true)
.withEmbeddedTimelineServerEnabled(true)
.withMarkersType(MarkerType.TIMELINE_SERVER_BASED.name())
.withRollbackUsingMarkers(true)
.withDeleteParallelism(parallelism)
.withParallelism(parallelism, parallelism)
.withFinalizeWriteParallelism(parallelism)
.withRollbackParallelism(parallelism / 2)
.withWriteBufferLimitBytes(32 * 1024 * 1024)
.withWriteConcurrencyMode(writeConcurrencyMode)
.withLockConfig(hoodieLockConfig)
//.withEngineType(EngineType.SPARK)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withAutoClean(true)
.withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy)
.withAsyncClean(false)
.build())
.withStorageConfig(
HoodieStorageConfig.newBuilder()
.parquetWriteLegacyFormat("false")
.build())
.withMetadataConfig(
HoodieMetadataConfig.newBuilder()
.withAsyncClean(false)
.withAsyncIndex(false)
.enable(true)
.build())
.withIndexConfig(
HoodieIndexConfig.newBuilder()
.withIndexType(IndexType.BLOOM)
.build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(40, 60)
.build())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withCompactionLazyBlockReadEnabled(true)
.build())
.build();
client = new HoodieJavaWriteClient<>(new
HoodieJavaEngineContext(hadoopConf), cfg);
gson = (new GsonBuilder()).setExclusionStrategies()
.registerTypeAdapter(FieldStateEnum.class, new VoidJsonSerializer())
.enableComplexMapKeySerialization()
.setLenient()
.setFieldNamingPolicy(doesUseLowerCaseWithUnderScores
? FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES :
FieldNamingPolicy.IDENTITY)
.create();
}
@Override
public void invoke(T value, Context context) throws Exception {
List<HoodieRecord<HoodieJsonPayload>> records = toHoodieRecords(value);
if (null == records || records.size() == 0) {
return;
}
List<WriteStatus> statusList = null;
try {
String newCommitTime = client.startCommit();
statusList = client.insert(records, newCommitTime);
} catch (Exception e) {
log.error("Meet some errors " + Arrays.toString(records.toArray()), e);
throw e;
}
HashMap<HoodieKey, Throwable> errors = statusList.get(0).getErrors();
if (null == errors || errors.size() == 0) {
return;
}
for (Map.Entry<HoodieKey, Throwable> e : errors.entrySet()) {
log.error("Can not insert into " + e.getKey().getRecordKey(),
e.getValue());
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]