[ https://issues.apache.org/jira/browse/HUDI-3444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
qian heng closed HUDI-3444. --------------------------- Fix Version/s: 0.11.0 Resolution: Fixed > NPE occured when ingesting data to MOR table by flink > ----------------------------------------------------- > > Key: HUDI-3444 > URL: https://issues.apache.org/jira/browse/HUDI-3444 > Project: Apache Hudi > Issue Type: Bug > Components: flink > Affects Versions: 0.10.0 > Reporter: qian heng > Priority: Major > Fix For: 0.11.0 > > > *{*}Describe the problem you faced{*}* > NPE occured when ingesting data to MOR table by flink. > Through stacktrace, we can find the position where NPE occured is when hudi > attemped to close HoodeiAppendHandle and write all left data in > HoodieAppendHandle to a log file. (The line number may be different as same > logs are added) > {code:java} > //代码占位符 > org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit > time 20220216172134086 > at > org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:79) > at > org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:49) > at > org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:72) > at > org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:167) > at > org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$6(StreamWriteFunction.java:548) > at > org.apache.hudi.sink.StreamWriteFunction.flushBucket(StreamWriteFunction.java:836) > at > org.apache.hudi.sink.StreamWriteFunction.bufferRecord(StreamWriteFunction.java:788) > at > org.apache.hudi.sink.StreamWriteFunction.processElement(StreamWriteFunction.java:298) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:264) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:257) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:426) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:688) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:643) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:654) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:627) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:831) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:612) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.lang.RuntimeException: > org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: java.lang.NullPointerException: > Cannot invoke > "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" > because "this.writer" is null > at > org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121) > at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) > at > org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:114) > at > org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:70) > at > org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:72) > ... 22 more > Caused by: org.apache.hudi.exception.HoodieException: > org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: java.lang.NullPointerException: > Cannot invoke > "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" > because "this.writer" is null > at > org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:106) > at > org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:43) > at > org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119) > ... 26 more > Caused by: org.apache.hudi.exception.HoodieException: > java.util.concurrent.ExecutionException: java.lang.NullPointerException: > Cannot invoke > "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" > because "this.writer" is null > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:188) > at > org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:102) > ... 28 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.NullPointerException: Cannot invoke > "org.apache.hudi.common.table.log.HoodieLogFormat$Writer.getLogFile()" > because "this.writer" is null > at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:182) > ... 29 more > Caused by: java.lang.NullPointerException > at > org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:387) > at org.apache.hudi.io.HoodieAppendHandle.close(HoodieAppendHandle.java:416) > at org.apache.hudi.io.FlinkAppendHandle.close(FlinkAppendHandle.java:94) > at > org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:115) > at > org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:80) > at > org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:41) > at > org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:162) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 more {code} > Through code, I think this NPE is caused by this postion: > BaseFlinkDeltaCommitActionExecutor.java > {code:java} > //代码占位符 > @Override > public Iterator<List<WriteStatus>> handleInsert(String idPfx, > Iterator<HoodieRecord<T>> recordItr) > { return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, > table, idPfx, taskContextSupplier, new > ExplicitWriteHandleFactory(writeHandle)); } > {code} > the BaseFlinkDeltaCommitActionExecutor use ExplicitWriteHandleFactory as > writeHandleFactory, which leads to the reuse of the HoodieAppendHandle in > CopyOnWriteInsertHandler > {code:java} > //代码占位符 > if (!handle.canWrite(payload.record)) > { // Handle is full. Close the handle and add the WriteStatus > statuses.addAll(handle.close()); // Open new handle handle = > writeHandleFactory.create(config, instantTime, hoodieTable, > insertPayload.getPartitionPath(), idPrefix, taskContextSupplier); > handles.put(partitionPath, handle); } > handle.write(insertPayload, payload.insertValue, payload.exception); > } > {code} > Thus, if a log file is fullfilled and its handle is closed which means the > writer in the HoodieAppendHandle is also cleared. When we use the same > HoodieAppendHandle next time will leads to NPE of the writer. > *{*}To Reproduce{*}* > Steps to reproduce the behavior: > 1. use flink to ingest data to a MOR table > 2. set hoodie.parquet.max.file.size to a small value, so the > #handle.canWrite(payload.record)# can become false easily, then the handle > will be closed > 3. when consuming the next record, the NPE will occur > *{*}Expected behavior{*}* > A clear and concise description of what you expected to happen. > *{*}Environment Description{*}* > * Hudi version : 0.10.0 > * Flink version : 0.13.1 > * Storage (HDFS/S3/GCS..) : hdfs > * Running on Docker? (yes/no) : no -- This message was sent by Atlassian Jira (v8.20.1#820001)