[ 
https://issues.apache.org/jira/browse/FLINK-7430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154966#comment-16154966
 ] 

Till Rohrmann edited comment on FLINK-7430 at 9/6/17 8:02 AM:
--------------------------------------------------------------

Thanks for reporting this issue [~pete]. I think the underlying problem is not 
strictly FLINK-6833 but more that the {{ContinuousFileReaderOperator}} does not 
adhere to the {{StreamOperator}} lifecycle model. What the 
{{ContinuousFileReaderOperator}} does is to consume input splits from an 
upstream source and gives this to another thread running the {{SplitReader}} 
which does the actual file reading. The problem here is that the 
{{ContinuousFileReaderOperator}} and the {{SplitReader}} are only loosely 
coupled and the former does not make sure that the latter sticks to the 
lifecycle of the {{StreamTask}}. This means that the 
{{ContinuousFileReaderOperator}} can finish processing which switches the 
underlying {{StreamTask}} to {{isRunning == false}} even though the 
{{SplitReader}} is still processing input splits. If the {{SplitReader}} now 
wants to fail the {{StreamTask}} because it encountered a problem, the 
exception won't be propagated because the state of the {{StreamTask}} is 
already {{isRunning == false}}.

One solution could be to move the {{isRunning}} guard from the 
{{handleAsyncException}} method to the caller {{StreamTask.java:932}}. Similar 
to {{StreamTask#triggerCheckpoint}} and {{StreamTask#performCheckpoint}}. 
Another solution could be to properly couple the {{SplitReader}} to the 
lifecycle of the underlying {{StreamTask}}.

The loose coupling also has other implications like that it thwarts proper 
checkpointing because every {{StreamTask}} with {{isRunning == false}} will 
abort an incoming checkpoint.

Maybe [~aljoscha] or [~kkl0u] can chime in.


was (Author: till.rohrmann):
Thanks for reporting this issue [~pete]. I think the underlying problem is not 
strictly FLINK-6833 but more that the {{ContinuousFileReaderOperator}} does not 
adhere to the {{StreamOperator}} lifecycle model. What the 
{{ContinuousFileReaderOperator}} does is to consume input splits from an 
upstream source and gives this to another thread running the {{SplitReader}} 
which does the actual file reading. The problem here is that the 
{{ContinuousFileReaderOperator}} and the {{SplitReader}} are only loosely 
coupled and the former does not make sure that the latter sticks to the 
lifecycle of the {{StreamTask}}. This means that the 
{{ContinuousFileReaderOperator}} can finish processing which switches the 
underlying {{StreamTask}} to {{isRunning == false}} even though the 
{{SplitReader}} is still processing input splits. If the {{SplitReader}} now 
wants to fail the {{StreamTask}} because it encountered a problem, the 
exception won't be propagated because the state of the {{StreamTask}} is 
already {{isRunning == false}}.

One solution could be to move the {{isRunning}} guard from the 
{{handleAsyncException}} method to the caller {{StreamTask.java:932}}. Similar 
to {{StreamTask#triggerCheckpoint}} and {{StreamTask#performCheckpoint}}. 
Another solution could be to properly couple the {{SplitReader}} to the 
lifecycle of the underlying {{StreamTask}}.

Maybe [~aljoscha] or [~kkl0u] can chime in.

> ContinuousFileReaderOperator swallows exceptions
> ------------------------------------------------
>
>                 Key: FLINK-7430
>                 URL: https://issues.apache.org/jira/browse/FLINK-7430
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, filesystem-connector
>    Affects Versions: 1.4.0, 1.3.2
>         Environment: - macOS 10.12.6
> - Oracle JDK 1.8.0_144
> - Flink 1.3.2
>            Reporter: Peter Ertl
>            Priority: Critical
>
> The class *ContinuousFileReaderOperator* is swallowing exceptions as the 
> following example demonstrates:
> {code:java}
> package org.apache.flink.streaming.examples;
> import java.io.File;
> import java.io.IOException;
> import org.apache.flink.api.common.io.OutputFormat;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class FormatExceptionSwallowed {
>       public static void main(String[] args) throws Exception {
>               final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               File bla = File.createTempFile("foo", "baz");
>               try(PrintWriter w = new PrintWriter(bla)) {
>                       w.println("one");
>                       w.println("two");
>                       w.println("three");
>               }
>               env.readTextFile(bla.getCanonicalPath())
>                       .writeUsingOutputFormat(new OutputFormat<String>() {
>                               @Override
>                               public void configure(final Configuration 
> parameters) {
>                               }
>                               @Override
>                               public void open(final int taskNumber, final 
> int numTasks) throws IOException {
>                               }
>                               @Override
>                               public void writeRecord(final String record) 
> throws IOException {
>                                       throw new 
> IllegalArgumentException("bla");
>                               }
>                               @Override
>                               public void close() throws IOException {
>                               }
>                       });
>               env.execute("go");
>               
>               // JOB TERMINATES WITH NO EXCEPTION / ERROR whatsoever ... 
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to