[
https://issues.apache.org/jira/browse/HUDI-3723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Raymond Xu updated HUDI-3723:
-----------------------------
Status: Open (was: In Progress)
> MOR MergeOnRead FitleringIterator stackoverflow error
> -----------------------------------------------------
>
> Key: HUDI-3723
> URL: https://issues.apache.org/jira/browse/HUDI-3723
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: sivabalan narayanan
> Assignee: Alexey Kudinkin
> Priority: Blocker
> Fix For: 0.11.0
>
>
> We run integration tests against hudi at regular cadence and recently we are
> seeing stackoverflow error w/ MOR table for spark long running yaml.
>
> {code:java}
> 22/03/26 14:27:04 INFO ValidateDatasetNode: Validate data in target hudi path
> basaePath/*/*/*
> 22/03/26 14:28:51 ERROR Executor: Exception in task 2.0 in stage 975.0 (TID
> 17933)
> java.lang.StackOverflowError
> at java.util.HashMap.removeNode(HashMap.java:821)
> at java.util.HashMap.remove(HashMap.java:800)
> at
> org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:238)
> at
> org.apache.hudi.common.util.collection.ExternalSpillableMap.remove(ExternalSpillableMap.java:55)
> at
> scala.collection.convert.Wrappers$JMapWrapperLike.remove(Wrappers.scala:296)
> at
> scala.collection.convert.Wrappers$JMapWrapperLike.remove$(Wrappers.scala:296)
> at
> scala.collection.convert.Wrappers$JMapWrapper.remove(Wrappers.scala:317)
> at
> org.apache.hudi.HoodieMergeOnReadRDD$LogFileIterator.removeLogRecord(HoodieMergeOnReadRDD.scala:187)
> at
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:262)
> at
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
> at
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
> at
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
> at
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
> at
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
> at
> org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.hasNext(HoodieMergeOnReadRDD.scala:271)
>
> .
> .
> .
> .{code}
> this repeats for some time and the jobs fails eventually.
>
> Likely the root cause is, in our iterator, if we encounter a delete record,
> we call hasNext() so that we skip current one and go to next. But this
> creates a call function in stack and so if this repeats for 8k or more times
> and if stack size in lesser in the corresponding jvm, our test will fail. In
> reality, there could be million delete records too. so, we need to find a way
> to fix this. For now, we are experimenting around "-Xss100m" java option
> temporarily to increase the stack size in the jvm.
>
> Code snippet from HoodieMORRDD
> especially the line
> ```
> if (mergedAvroRecordOpt.isEmpty) { // Record has been deleted, skipping
> this.hasNext
> ```
> in below snippet.
>
> {code:java}
> override def hasNext: Boolean = {
> if (baseFileIterator.hasNext) {
> val curRowRecord = baseFileIterator.next()
> val curKey = curRowRecord.getString(recordKeyOrdinal)
> val updatedRecordOpt = removeLogRecord(curKey)
> if (updatedRecordOpt.isEmpty) {
> // No merge needed, load current row with required projected schema
> recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord,
> requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
> true
> } else {
> val mergedAvroRecordOpt = merge(serialize(curRowRecord),
> updatedRecordOpt.get)
> if (mergedAvroRecordOpt.isEmpty) {
> // Record has been deleted, skipping
> this.hasNext
> } else {
> // NOTE: In occurrence of a merge we can't know the schema of the
> record being returned, b/c
> // record from the Delta Log will bear (full) Table schema,
> while record from the Base file
> // might already be read in projected one (as an optimization).
> // As such we can't use more performant [[projectAvroUnsafe]],
> and instead have to fallback
> // to [[projectAvro]]
> val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get,
> requiredAvroSchema, recordBuilder)
> recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
> true
> }
> }
> } else {
> super[LogFileIterator].hasNext
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)