QuChunhe opened a new issue, #6584:
URL: https://github.com/apache/hudi/issues/6584

   Hudi Java client don't support Multi Writing, and throw  errors: "Cannot 
resolve conflicts for overlapping writes"
   
   1. Hudi version: 0.12.0, aliyun oss file system, flink 1.13.6, and hudi sink 
parallelism 2
   
   2. Hudi Java client  configuration  is as follows. 
   
   
      public HudiSink setZookeeperLock(String zkQuorum) {
       if (Objects.isNull(zkQuorum)) {
         return this;
       }
       writeConcurrencyMode = 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
       hoodieFailedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
       hoodieLockConfig = HoodieLockConfig.newBuilder()
           .withZkQuorum(zkQuorum)
           .withZkBasePath("/" + databasePrefix)
           .withZkLockKey(tableName)
           .withZkConnectionTimeoutInMs(50L * 1000L)
           .withZkSessionTimeoutInMs(1500 * 1000L)
           .withRetryWaitTimeInMillis(60L * 1000L)
           .build();
       return this;
     }
   
   
   @Override
     public void open(Configuration parameters) throws Exception {
       super.open(parameters);
       org.apache.hadoop.conf.Configuration hadoopConf = new 
org.apache.hadoop.conf.Configuration();
       String tablePath = warehousePath + databasePrefix + "/" + tableName;
       // initialize the table, if not done already
       Path path = new Path(tablePath);
       FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
   
       String schema = Util.getStringFromResource("/" + databasePrefix + "."
           + tableName + ".schema");
   
       // Create the write client to write some records in
       HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder()
           .withPath(tablePath)
           .withSchema(schema)
           .forTable(tableName)
           .withAutoCommit(true)
           .withTableServicesEnabled(true)
           .withEmbeddedTimelineServerEnabled(true)
           .withMarkersType(MarkerType.TIMELINE_SERVER_BASED.name())
           .withRollbackUsingMarkers(true)
           .withDeleteParallelism(parallelism)
           .withParallelism(parallelism, parallelism)
           .withFinalizeWriteParallelism(parallelism)
           .withRollbackParallelism(parallelism / 2)
           .withWriteBufferLimitBytes(32 * 1024 * 1024)
           .withWriteConcurrencyMode(writeConcurrencyMode)
           .withLockConfig(hoodieLockConfig)
           //.withEngineType(EngineType.SPARK)
           .withCleanConfig(HoodieCleanConfig.newBuilder()
               .withAutoClean(true)
               .withFailedWritesCleaningPolicy(hoodieFailedWritesCleaningPolicy)
               .withAsyncClean(false)
               .build())
           .withStorageConfig(
               HoodieStorageConfig.newBuilder()
                   .parquetWriteLegacyFormat("false")
                   .build())
           .withMetadataConfig(
               HoodieMetadataConfig.newBuilder()
                   .withAsyncClean(false)
                   .withAsyncIndex(false)
                   .enable(true)
                   .build())
           .withIndexConfig(
               HoodieIndexConfig.newBuilder()
                   .withIndexType(IndexType.BLOOM)
                   .build())
           .withArchivalConfig(HoodieArchivalConfig.newBuilder()
               .archiveCommitsWith(40, 60)
               .build())
           .withCompactionConfig(
               HoodieCompactionConfig.newBuilder()
                   .withCompactionLazyBlockReadEnabled(true)
                   .build())
           .build();
       client = new HoodieJavaWriteClient<>(new 
HoodieJavaEngineContext(hadoopConf), cfg);
   
     }
   
   
   3. I have configured the HoodieFailedWritesCleaningPolicy.LAZY and  
HoodieLockConfig. The zookeeper log is as follows
   
   2022-09-04 19:27:30,947 INFO  org.apache.zookeeper.ZooKeeper                 
              [] - Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
   2022-09-04 19:27:30,947 INFO  org.apache.zookeeper.ZooKeeper                 
              [] - Client environment:java.io.tmpdir=/tmp
   2022-09-04 19:27:30,947 INFO  org.apache.zookeeper.ZooKeeper                 
              [] - Client environment:java.compiler=<NA>
   2022-09-04 19:27:30,947 INFO  org.apache.zookeeper.ZooKeeper                 
              [] - Client environment:os.name=Linux
   2022-09-04 19:27:30,947 INFO  org.apache.zookeeper.ZooKeeper                 
              [] - Client environment:os.arch=amd64
   2022-09-04 19:27:30,947 INFO  org.apache.zookeeper.ZooKeeper                 
              [] - Client environment:os.version=4.19.91-25.8.al7.x86_64
   2022-09-04 19:27:30,947 INFO  org.apache.zookeeper.ZooKeeper                 
              [] - Client environment:user.name=hadoop
   2022-09-04 19:27:30,947 INFO  org.apache.zookeeper.ZooKeeper                 
              [] - Client environment:user.home=/home/hadoop
   2022-09-04 19:27:30,947 INFO  org.apache.zookeeper.ZooKeeper                 
              [] - Client 
environment:user.dir=/mnt/disk4/yarn/nm-local-dir/usercache/root/appcache/application_1661840256099_1332/container_e01_1661840256099_1332_01_000002
   2022-09-04 19:27:30,948 INFO  org.apache.zookeeper.ZooKeeper                 
              [] - Initiating client connection, 
connectString=*:2181,*:2181,*:2181 sessionTimeout=1500000 
watcher=org.apache.curator.ConnectionState@18582146
   2022-09-04 19:27:30,967 WARN  org.apache.zookeeper.ClientCnxn                
              [] - SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration section named 
'Client' was found in specified JAAS configuration file: 
'/mnt/disk3/yarn/nm-local-dir/usercache/root/appcache/application_1661840256099_1332/jaas-6909589205217539821.conf'.
 Will continue connection to Zookeeper server without SASL authentication, if 
Zookeeper server allows it.
   2022-09-04 19:27:30,968 INFO  
shadow.gs.org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider [] 
- ACQUIRING lock atZkBasePath = /gs_ods.db, lock key = 
ods_i_rcc_robot_state_report_v2
   2022-09-04 19:27:30,969 INFO  org.apache.zookeeper.ClientCnxn                
              [] - Opening socket connection to server 
master-1-3.c-ac534405d725db11.cn-shanghai.emr.aliyuncs.com/*:2181
   2022-09-04 19:27:30,969 INFO  org.apache.zookeeper.ClientCnxn                
              [] - Socket connection established to 
master-1-3.c-ac534405d725db11.cn-shanghai.emr.aliyuncs.com/*:2181, initiating 
session
   2022-09-04 19:27:30,975 INFO  org.apache.zookeeper.ClientCnxn                
              [] - Session establishment complete on server 
master-1-3.c-ac534405d725db11.cn-shanghai.emr.aliyuncs.com/*:2181, sessionid = 
0x3008f4f8a0391be, negotiated timeout = 360000
   
   5. The errors are as follows
   
   shadow.gs.org.apache.hudi.exception.HoodieUpsertException: Failed to upsert 
for commit time 20220904192833325
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor.execute(JavaInsertCommitActionExecutor.java:47)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.HoodieJavaCopyOnWriteTable.insert(HoodieJavaCopyOnWriteTable.java:111)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.HoodieJavaCopyOnWriteTable.insert(HoodieJavaCopyOnWriteTable.java:82)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.client.HoodieJavaWriteClient.insert(HoodieJavaWriteClient.java:126)
 ~[robot-stream.jar:?]
           at com.robot.gs.sink.HudiSink.invoke(HudiSink.java:132) 
~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
 ~[robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
 [robot-stream.jar:?]
           at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) 
[robot-stream.jar:?]
           at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
[robot-stream.jar:?]
           at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
[robot-stream.jar:?]
           at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
   Caused by: shadow.gs.org.apache.hudi.exception.HoodieWriteConflictException: 
java.util.ConcurrentModificationException: Cannot resolve conflicts for 
overlapping writes
           at 
shadow.gs.org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85)
 ~[robot-stream.jar:?]
           at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) 
~[?:1.8.0_332]
           at 
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) 
~[?:1.8.0_332]
           at 
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) 
~[?:1.8.0_332]
           at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) 
~[?:1.8.0_332]
           at 
shadow.gs.org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:189)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:175)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseJavaCommitActionExecutor.java:345)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.execute(BaseJavaCommitActionExecutor.java:126)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.execute(BaseJavaCommitActionExecutor.java:68)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
 ~[robot-stream.jar:?]
           ... 19 more
   Caused by: java.util.ConcurrentModificationException: Cannot resolve 
conflicts for overlapping writes
           at 
shadow.gs.org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:85)
 ~[robot-stream.jar:?]
           at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) 
~[?:1.8.0_332]
           at 
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) 
~[?:1.8.0_332]
           at 
java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) 
~[?:1.8.0_332]
           at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) 
~[?:1.8.0_332]
           at 
shadow.gs.org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:79)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:189)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:175)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseJavaCommitActionExecutor.java:345)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.execute(BaseJavaCommitActionExecutor.java:126)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor.execute(BaseJavaCommitActionExecutor.java:68)
 ~[robot-stream.jar:?]
           at 
shadow.gs.org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:57)
 ~[robot-stream.jar:?]


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to