[ https://issues.apache.org/jira/browse/FLINK-7430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154966#comment-16154966 ]
Till Rohrmann edited comment on FLINK-7430 at 9/6/17 8:02 AM: -------------------------------------------------------------- Thanks for reporting this issue [~pete]. I think the underlying problem is not strictly FLINK-6833 but more that the {{ContinuousFileReaderOperator}} does not adhere to the {{StreamOperator}} lifecycle model. What the {{ContinuousFileReaderOperator}} does is to consume input splits from an upstream source and gives this to another thread running the {{SplitReader}} which does the actual file reading. The problem here is that the {{ContinuousFileReaderOperator}} and the {{SplitReader}} are only loosely coupled and the former does not make sure that the latter sticks to the lifecycle of the {{StreamTask}}. This means that the {{ContinuousFileReaderOperator}} can finish processing which switches the underlying {{StreamTask}} to {{isRunning == false}} even though the {{SplitReader}} is still processing input splits. If the {{SplitReader}} now wants to fail the {{StreamTask}} because it encountered a problem, the exception won't be propagated because the state of the {{StreamTask}} is already {{isRunning == false}}. One solution could be to move the {{isRunning}} guard from the {{handleAsyncException}} method to the caller {{StreamTask.java:932}}. Similar to {{StreamTask#triggerCheckpoint}} and {{StreamTask#performCheckpoint}}. Another solution could be to properly couple the {{SplitReader}} to the lifecycle of the underlying {{StreamTask}}. The loose coupling also has other implications like that it thwarts proper checkpointing because every {{StreamTask}} with {{isRunning == false}} will abort an incoming checkpoint. Maybe [~aljoscha] or [~kkl0u] can chime in. was (Author: till.rohrmann): Thanks for reporting this issue [~pete]. I think the underlying problem is not strictly FLINK-6833 but more that the {{ContinuousFileReaderOperator}} does not adhere to the {{StreamOperator}} lifecycle model. What the {{ContinuousFileReaderOperator}} does is to consume input splits from an upstream source and gives this to another thread running the {{SplitReader}} which does the actual file reading. The problem here is that the {{ContinuousFileReaderOperator}} and the {{SplitReader}} are only loosely coupled and the former does not make sure that the latter sticks to the lifecycle of the {{StreamTask}}. This means that the {{ContinuousFileReaderOperator}} can finish processing which switches the underlying {{StreamTask}} to {{isRunning == false}} even though the {{SplitReader}} is still processing input splits. If the {{SplitReader}} now wants to fail the {{StreamTask}} because it encountered a problem, the exception won't be propagated because the state of the {{StreamTask}} is already {{isRunning == false}}. One solution could be to move the {{isRunning}} guard from the {{handleAsyncException}} method to the caller {{StreamTask.java:932}}. Similar to {{StreamTask#triggerCheckpoint}} and {{StreamTask#performCheckpoint}}. Another solution could be to properly couple the {{SplitReader}} to the lifecycle of the underlying {{StreamTask}}. Maybe [~aljoscha] or [~kkl0u] can chime in. > ContinuousFileReaderOperator swallows exceptions > ------------------------------------------------ > > Key: FLINK-7430 > URL: https://issues.apache.org/jira/browse/FLINK-7430 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector > Affects Versions: 1.4.0, 1.3.2 > Environment: - macOS 10.12.6 > - Oracle JDK 1.8.0_144 > - Flink 1.3.2 > Reporter: Peter Ertl > Priority: Critical > > The class *ContinuousFileReaderOperator* is swallowing exceptions as the > following example demonstrates: > {code:java} > package org.apache.flink.streaming.examples; > import java.io.File; > import java.io.IOException; > import org.apache.flink.api.common.io.OutputFormat; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > public class FormatExceptionSwallowed { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > File bla = File.createTempFile("foo", "baz"); > try(PrintWriter w = new PrintWriter(bla)) { > w.println("one"); > w.println("two"); > w.println("three"); > } > env.readTextFile(bla.getCanonicalPath()) > .writeUsingOutputFormat(new OutputFormat<String>() { > @Override > public void configure(final Configuration > parameters) { > } > @Override > public void open(final int taskNumber, final > int numTasks) throws IOException { > } > @Override > public void writeRecord(final String record) > throws IOException { > throw new > IllegalArgumentException("bla"); > } > @Override > public void close() throws IOException { > } > }); > env.execute("go"); > > // JOB TERMINATES WITH NO EXCEPTION / ERROR whatsoever ... > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)