[ https://issues.apache.org/jira/browse/FLINK-35343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865452#comment-17865452 ]
Shekhar Prasad Rajak commented on FLINK-35343: ---------------------------------------------- I would like to work on this issue. > NullPointerException in SourceReaderBase > ---------------------------------------- > > Key: FLINK-35343 > URL: https://issues.apache.org/jira/browse/FLINK-35343 > Project: Flink > Issue Type: Bug > Components: API / Core > Affects Versions: 1.17.2 > Environment: * flink(1.17.2), local mode and deploy on k8s > * doris-flink-connector-1.17(1.6.0) > * Doris(2.1) > h2. > Reporter: zyh > Priority: Blocker > Attachments: flinktask.png, servicelog.png > > > h2. operation > I used flink batch to read data from Doris and write to Doris. > The flink job include two source task, one table join task and one sink task, > which like: > source: Table A > source: Table B > hashjoin: c= a join b > sink: c > h2. > h2. table properties > h3. source > properties.put("connector", "doris"); > properties.put("fenodes", inputDataSource.getHttpUrl()); > properties.put("table.identifier", inputDataSource.getDatabase() + "." + > sourceTable.getName()); > properties.put("username", inputDataSource.getUsername()); > properties.put("password", inputDataSource.getPassword()); > h3. sink > properties.put("connector", "doris"); > properties.put("fenodes", dataExplore.getOutputDataSource().getHttpUrl()); > properties.put("table.identifier", > dataExplore.getOutputDataSource().getDatabase() + "." + tableName); > properties.put("username", dataExplore.getOutputDataSource().getUsername()); > properties.put("password", dataExplore.getOutputDataSource().getPassword()); > properties.put("sink.properties.format", "csv"); > //列分隔符 > properties.put("sink.properties.column_separator", "#cs_"); > //行分隔符 > properties.put("sink.properties.line_delimiter", "#ld_"); > properties.put("sink.label-prefix", "doris_label" + UUID.randomUUID()); > properties.put("sink.parallelism", "2"); > > > > h2. exception stack > {code:java} > Caused by: java.lang.NullPointerException at > org.apache.flink.connector.base.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:194) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:208) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:173) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at > java.lang.Thread.run(Thread.java:748) {code} > > h2. other > The problem only occur in flink local mode and deploy on k8s. > -- This message was sent by Atlassian Jira (v8.20.10#820010)