[ https://issues.apache.org/jira/browse/HUDI-3058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504918#comment-17504918 ]
zhangyingjie commented on HUDI-3058: ------------------------------------ When I run two spark jobs to write to the same hudi table, I encounter the same error: py4j.protocol.Py4JJavaError: An error occurred while calling o92.save. : org.apache.hudi.exception.HoodieWriteConflictException: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes at org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy.resolveConflict(SimpleConcurrentFileWritesConflictResolutionStrategy.java:102) at org.apache.hudi.client.utils.TransactionUtils.lambda$resolveWriteConflictIfAny$0(TransactionUtils.java:73) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:742) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at org.apache.hudi.client.utils.TransactionUtils.resolveWriteConflictIfAny(TransactionUtils.java:67) at org.apache.hudi.client.SparkRDDWriteClient.preCommit(SparkRDDWriteClient.java:502) at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:196) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:125) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:635) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:286) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126) at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:962) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:287) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.ConcurrentModificationException: Cannot resolve conflicts for overlapping writes ... 44 more > SqlQueryEqualityPreCommitValidator errors with > java.util.ConcurrentModificationException > ---------------------------------------------------------------------------------------- > > Key: HUDI-3058 > URL: https://issues.apache.org/jira/browse/HUDI-3058 > Project: Apache Hudi > Issue Type: Bug > Components: Usability > Affects Versions: 0.10.0 > Reporter: sivabalan narayanan > Assignee: satish > Priority: Major > Labels: pull-request-available, sev:high > Fix For: 0.11.0 > > > Ref issue: [https://github.com/apache/hudi/issues/4109] > > Faced concurrentModificationException when trying to test > SqlQueryEqualityPreCommitValidator in quickstart guide > *To Reproduce* > Steps to reproduce the behavior: > # Insert data without any pre commit validations > # Update data (ensured the updates dont touch the fare column in quickstart > example) with the following precommit validator props > {{option("hoodie.precommit.validators", > "org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator"). > option("hoodie.precommit.validators.equality.sql.queries", "select sum(fare) > from <TABLE_NAME>").}} > stacktrace: > {code:java} > org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit > time 20211124114945342 > at > org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62) > at > org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:46) > at > org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:111) > at > org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:95) > at > org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:174) > at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214) > at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:276) > at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164) > at > org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) > at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) > at > org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) > ... 70 elided > Caused by: java.util.ConcurrentModificationException > at java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1633) > at > java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:743) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) > at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272) > at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) > at > org.apache.hudi.client.utils.SparkValidatorUtils.getRecordsFromPendingCommits(SparkValidatorUtils.java:159) > at > org.apache.hudi.client.utils.SparkValidatorUtils.runValidators(SparkValidatorUtils.java:78) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.runPrecommitValidators(BaseSparkCommitActionExecutor.java:401) > at > org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:140) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:267) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:182) > at > org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:82) > at > org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:55) > ... 98 more {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)