[ 
https://issues.apache.org/jira/browse/HUDI-3444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qian heng closed HUDI-3444.
---------------------------
    Fix Version/s: 0.11.0
       Resolution: Fixed

> NPE occured when ingesting data to MOR table by flink
> -----------------------------------------------------
>
>                 Key: HUDI-3444
>                 URL: https://issues.apache.org/jira/browse/HUDI-3444
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: flink
>    Affects Versions: 0.10.0
>            Reporter: qian heng
>            Priority: Major
>             Fix For: 0.11.0
>
>
> *{*}Describe the problem you faced{*}*
> NPE occured when ingesting data to MOR table by flink.
> Through stacktrace, we can find the position where NPE occured is when hudi 
> attemped to close HoodeiAppendHandle and write all left data in 
> HoodieAppendHandle to a log file. (The line number may be different as same 
> logs are added)
> {code:java}
> //代码占位符
> org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit 
> time 20220216172134086
> at 
> org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:79)
> at 
> org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:49)
> at 
> org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:72)
> at 
> org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:167)
> at 
> org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$6(StreamWriteFunction.java:548)
> at 
> org.apache.hudi.sink.StreamWriteFunction.flushBucket(StreamWriteFunction.java:836)
> at 
> org.apache.hudi.sink.StreamWriteFunction.bufferRecord(StreamWriteFunction.java:788)
> at 
> org.apache.hudi.sink.StreamWriteFunction.processElement(StreamWriteFunction.java:298)
> at 
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:264)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:257)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:426)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:688)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:643)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:654)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:627)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:831)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:612)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: 
> org.apache.hudi.exception.HoodieException: 
> org.apache.hudi.exception.HoodieException: 
> java.util.concurrent.ExecutionException: java.lang.NullPointerException: 
> Cannot invoke 
> "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" 
> because "this.writer" is null
> at 
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
> at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
> at 
> org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:114)
> at 
> org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:70)
> at 
> org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:72)
> ... 22 more
> Caused by: org.apache.hudi.exception.HoodieException: 
> org.apache.hudi.exception.HoodieException: 
> java.util.concurrent.ExecutionException: java.lang.NullPointerException: 
> Cannot invoke 
> "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" 
> because "this.writer" is null
> at 
> org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:106)
> at 
> org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:43)
> at 
> org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
> ... 26 more
> Caused by: org.apache.hudi.exception.HoodieException: 
> java.util.concurrent.ExecutionException: java.lang.NullPointerException: 
> Cannot invoke 
> "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" 
> because "this.writer" is null
> at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:188)
> at 
> org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:102)
> ... 28 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.NullPointerException: Cannot invoke 
> "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" 
> because "this.writer" is null
> at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
> at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:182)
> ... 29 more
> Caused by: java.lang.NullPointerException
> at 
> org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:387)
> at org.apache.hudi.io.HoodieAppendHandle.close(HoodieAppendHandle.java:416)
> at org.apache.hudi.io.FlinkAppendHandle.close(FlinkAppendHandle.java:94)
> at 
> org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:115)
> at 
> org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:80)
> at 
> org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:41)
> at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:162)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more {code}
> Through code, I think this NPE is caused by this postion:
> BaseFlinkDeltaCommitActionExecutor.java
> {code:java}
> //代码占位符
> @Override
> public Iterator<List<WriteStatus>> handleInsert(String idPfx, 
> Iterator<HoodieRecord<T>> recordItr)
> { return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, 
> table, idPfx, taskContextSupplier, new 
> ExplicitWriteHandleFactory(writeHandle)); }
> {code}
> the BaseFlinkDeltaCommitActionExecutor use ExplicitWriteHandleFactory as 
> writeHandleFactory, which leads to the reuse of the HoodieAppendHandle in 
> CopyOnWriteInsertHandler
> {code:java}
> //代码占位符
> if (!handle.canWrite(payload.record))
> { // Handle is full. Close the handle and add the WriteStatus 
> statuses.addAll(handle.close()); // Open new handle handle = 
> writeHandleFactory.create(config, instantTime, hoodieTable, 
> insertPayload.getPartitionPath(), idPrefix, taskContextSupplier); 
> handles.put(partitionPath, handle); }
> handle.write(insertPayload, payload.insertValue, payload.exception);
> }
> {code}
> Thus, if a log file is fullfilled and its handle is closed which means the 
> writer in the HoodieAppendHandle is also cleared. When we use the same 
> HoodieAppendHandle next time will leads to NPE of the writer.
> *{*}To Reproduce{*}*
> Steps to reproduce the behavior:
> 1. use flink to ingest data to a MOR table
> 2. set hoodie.parquet.max.file.size to a small value, so the 
> #handle.canWrite(payload.record)# can become false easily, then the handle 
> will be closed
> 3. when consuming the next record, the NPE will occur
> *{*}Expected behavior{*}*
> A clear and concise description of what you expected to happen.
> *{*}Environment Description{*}*
>  * Hudi version : 0.10.0
>  * Flink version : 0.13.1
>  * Storage (HDFS/S3/GCS..) : hdfs
>  * Running on Docker? (yes/no) : no



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to