QuChunhe opened a new issue, #6584:
URL: https://github.com/apache/hudi/issues/6584
Hudi Java client don't support Multi Writing, and throw errors: "Cannot
resolve conflicts for overlapping writes"
1. Hudi version: 0.12.0, aliyun oss file system, flink 1.13.6, and hudi sink
parallelism 2
2. Hudi Java client configuration is as follows.
public HudiSink setZookeeperLock(String zkQuorum) {
if (Objects.isNull(zkQuorum)) {
return this;
}
writeConcurrencyMode =
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
hoodieFailedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
hoodieLockConfig = HoodieLockConfig.newBuilder()
.withZkQuorum(zkQuorum)
.withZkBasePath("/" + databasePrefix)
.withZkLockKey(tableName)
.withZkConnectionTimeoutInMs(50L * 1000L)
.withZkSessionTimeoutInMs(1500 * 1000L)
.withRetryWaitTimeInMillis(60L * 1000L)
.build();
return this;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
org.apache.hadoop.conf.Configuration hadoopConf = new
org.apache.hadoop.conf.Configuration();
String tablePath = warehousePath + databasePrefix + "/" + tableName;
// initialize the table, if not done already
Path path = new Path(tablePath);
FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
String schema = Util.getStringFromResource("/" + databasePrefix + "."
+ tableName + ".schema");
// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
.withPath(tablePath)
.withSchema(schema)
.forTable(tableName)
.withAutoCommit(true)
.withTableServicesEnabled(true)
.withEmbeddedTimelineServerEnabled(true)
.withMarkersType(MarkerType.TIMELINE_SERVER_BASED.name())
.withRollbackUsingMarkers(true)
.withDeleteParallelism(parallelism)
.withParallelism(parallelism, parallelism)
.withFinalizeWriteParallelism(parallelism)
.withRollbackParallelism(parallelism / 2)
.withWriteBufferLimitBytes(32 * 1024 * 1024)
.withWriteConcurrencyMode(writeConcurrencyMode)
.withLockConfig(hoodieLockConfig)
//.withEngineType(EngineType.SPARK)
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withAutoClean(true)
.withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy)
.withAsyncClean(false)
.build())
.withStorageConfig(
HoodieStorageConfig.newBuilder()
.parquetWriteLegacyFormat("false")
.build())
.withMetadataConfig(
HoodieMetadataConfig.newBuilder()
.withAsyncClean(false)
.withAsyncIndex(false)
.enable(true)
.build())
.withIndexConfig(
HoodieIndexConfig.newBuilder()
.withIndexType(IndexType.BLOOM)
.build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(40, 60)
.build())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withCompactionLazyBlockReadEnabled(true)
.build())
.build();
client = new HoodieJavaWriteClient<>(new
HoodieJavaEngineContext(hadoopConf), cfg);
}
3. I have configured the HoodieFailedWritesCleaningPolicy.LAZY and
HoodieLockConfig. The zookeeper log is as follows
2022-09-04 19:27:30,947 INFO org.apache.zookeeper.ZooKeeper
[] - Client
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2022-09-04 19:27:30,947 INFO org.apache.zookeeper.ZooKeeper
[] - Client environment:java.io.tmpdir=/tmp
2022-09-04 19:27:30,947 INFO org.apache.zookeeper.ZooKeeper
[] - Client environment:java.compiler=<NA>
2022-09-04 19:27:30,947 INFO org.apache.zookeeper.ZooKeeper
[] - Client environment:os.name=Linux
2022-09-04 19:27:30,947 INFO org.apache.zookeeper.ZooKeeper
[] - Client environment:os.arch=amd64
2022-09-04 19:27:30,947 INFO org.apache.zookeeper.ZooKeeper
[] - Client environment:os.version=4.19.91-25.8.al7.x86_64
2022-09-04 19:27:30,947 INFO org.apache.zookeeper.ZooKeeper
[] - Client environment:user.name=hadoop
2022-09-04 19:27:30,947 INFO org.apache.zookeeper.ZooKeeper
[] - Client environment:user.home=/home/hadoop
2022-09-04 19:27:30,947 INFO org.apache.zookeeper.ZooKeeper
[] - Client
environment:user.dir=/mnt/disk4/yarn/nm-local-dir/usercache/root/appcache/application_1661840256099_1332/container_e01_1661840256099_1332_01_000002
2022-09-04 19:27:30,948 INFO org.apache.zookeeper.ZooKeeper
[] - Initiating client connection,
connectString=*:2181,*:2181,*:2181 sessionTimeout=1500000
watcher=org.apache.curator.ConnectionState@18582146
2022-09-04 19:27:30,967 WARN org.apache.zookeeper.ClientCnxn
[] - SASL configuration failed:
javax.security.auth.login.LoginException: No JAAS configuration section named
'Client' was found in specified JAAS configuration file:
'/mnt/disk3/yarn/nm-local-dir/usercache/root/appcache/application_1661840256099_1332/jaas-6909589205217539821.conf'.
Will continue connection to Zookeeper server without SASL authentication, if
Zookeeper server allows it.
2022-09-04 19:27:30,968 INFO
shadow.gs.org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider []
- ACQUIRING lock atZkBasePath = /gs_ods.db, lock key =
ods_i_rcc_robot_state_report_v2
2022-09-04 19:27:30,969 INFO org.apache.zookeeper.ClientCnxn
[] - Opening socket connection to server
master-1-3.c-ac534405d725db11.cn-shanghai.emr.aliyuncs.com/*:2181
2022-09-04 19:27:30,969 INFO org.apache.zookeeper.ClientCnxn
[] - Socket connection established to
master-1-3.c-ac534405d725db11.cn-shanghai.emr.aliyuncs.com/*:2181, initiating
session
2022-09-04 19:27:30,975 INFO org.apache.zookeeper.ClientCnxn
[] - Session establishment complete on server
master-1-3.c-ac534405d725db11.cn-shanghai.emr.aliyuncs.com/*:2181, sessionid =
0x3008f4f8a0391be, negotiated timeout = 360000
5. The errors are as follows
shadow.gs.org.apache.hudi.exception.HoodieUpsertException: Failed to upsert
for commit time 20220904192833325
at
shadow.gs.org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor.execute(JavaInsertCommitActionExecutor.java:47)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.HoodieJavaCopyOnWriteTable.insert(HoodieJavaCopyOnWriteTable.java:111)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.HoodieJavaCopyOnWriteTable.insert(HoodieJavaCopyOnWriteTable.java:82)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.client.HoodieJavaWriteClient.insert(HoodieJavaWriteClient.java:126)
~[robot-stream.jar:?]
at com.robot.gs.sink.HudiSink.invoke(HudiSink.java:132)
~[robot-stream.jar:?]
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
~[robot-stream.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
~[robot-stream.jar:?]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
~[robot-stream.jar:?]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
~[robot-stream.jar:?]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
~[robot-stream.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
~[robot-stream.jar:?]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
~[robot-stream.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
~[robot-stream.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
~[robot-stream.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
[robot-stream.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
[robot-stream.jar:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
[robot-stream.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
[robot-stream.jar:?]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
Caused by: shadow.gs.org.apache.hudi.exception.HoodieWriteConflictException:
java.util.ConcurrentModificationException: Cannot resolve conflicts for
overlapping writes
at
shadow.gs.org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85)
~[robot-stream.jar:?]
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
~[?:1.8.0_332]
at
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
~[?:1.8.0_332]
at
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
~[?:1.8.0_332]
at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
~[?:1.8.0_332]
at
shadow.gs.org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:189)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:175)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseJavaCommitActionExecutor.java:345)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.execute(BaseJavaCommitActionExecutor.java:126)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.execute(BaseJavaCommitActionExecutor.java:68)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
~[robot-stream.jar:?]
... 19 more
Caused by: java.util.ConcurrentModificationException: Cannot resolve
conflicts for overlapping writes
at
shadow.gs.org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85)
~[robot-stream.jar:?]
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
~[?:1.8.0_332]
at
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
~[?:1.8.0_332]
at
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742)
~[?:1.8.0_332]
at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
~[?:1.8.0_332]
at
shadow.gs.org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:189)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:175)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseJavaCommitActionExecutor.java:345)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.execute(BaseJavaCommitActionExecutor.java:126)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.execute(BaseJavaCommitActionExecutor.java:68)
~[robot-stream.jar:?]
at
shadow.gs.org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
~[robot-stream.jar:?]
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]