[ https://issues.apache.org/jira/browse/FLINK-7430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16123207#comment-16123207 ]
Peter Ertl edited comment on FLINK-7430 at 8/11/17 11:20 AM: ------------------------------------------------------------- It seems like the task itself is finished but the output format is still writing asynchronously !? Therefore the "isRunning" flag in *StreamTask* is set to *false* which will make this line skip exception handling StreamTask.java @ line 829 {code:java} @Override public void handleAsyncException(String message, Throwable exception) { if (isRunning) { // only fail if the task is still running getEnvironment().failExternally(exception); } } {code} was (Author: pete): It seems like the task itself is finished but the output format is still writing asynchronously. Therefore the "isRunning" flag in *StreamTask* is set to *false* which will make this line skip exception handling StreamTask.java @ line 829 {code:java} @Override public void handleAsyncException(String message, Throwable exception) { if (isRunning) { // only fail if the task is still running getEnvironment().failExternally(exception); } } {code} > ContinuousFileReaderOperator swallows exceptions > ------------------------------------------------ > > Key: FLINK-7430 > URL: https://issues.apache.org/jira/browse/FLINK-7430 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.3.2 > Environment: - macOS 10.12.6 > - Oracle JDK 1.8.0_144 > - Flink 1.3.2 > Reporter: Peter Ertl > > The class *ContinuousFileReaderOperator* is swallowing exceptions as the > following example demonstrates: > {code:java} > package org.apache.flink.streaming.examples; > import java.io.File; > import java.io.IOException; > import org.apache.flink.api.common.io.OutputFormat; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > public class FormatExceptionSwallowed { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > File bla = File.createTempFile("foo", "baz"); > try(PrintWriter w = new PrintWriter(bla)) { > w.println("one"); > w.println("two"); > w.println("three"); > } > env.readTextFile(bla.getCanonicalPath()) > .writeUsingOutputFormat(new OutputFormat<String>() { > @Override > public void configure(final Configuration > parameters) { > } > @Override > public void open(final int taskNumber, final > int numTasks) throws IOException { > } > @Override > public void writeRecord(final String record) > throws IOException { > throw new > IllegalArgumentException("bla"); > } > @Override > public void close() throws IOException { > } > }); > env.execute("go"); > > // JOB TERMINATES WITH NO EXCEPTION / ERROR whatsoever ... > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)