Hi dev, After debugging and analysis, eventually the culprit is found.
In order to cope with occasional data-format errors and intermittent source exceptions (Internet connection is somewhat unstable in my environment) that interrupt my Flink jobs occasionally, I have added a broad catch block in *org.apache.flink.streaming.api.operators.StreamSource#run* method to catch any exceptions thrown in the source operator by* userFunction.run(ctx);*, and this patch works well and keeps the jobs running smoothly without having to restart the job again and again. However, when the IOException is thrown at KafkaFetcher#run, in this method no catch block is defined, thus the exception would go directly to the finally block, which calls *consumerThread.shutdown();. *Therefore, even when this exception is later caught in StreamSource, it already causes KafkaFetcher to permanently close the consumer thread without any logging or warnings, so Flink can do nothing but finish the job. [image: image.png] In conclusion, it is my fault not to catch the exception in KafkaFetcher, and Flink is not the one to blame : ) Thanks Sincerely, Weike On Tue, Mar 17, 2020 at 11:44 AM DONG, Weike <kyled...@connect.hku.hk> wrote: > Hi community, > > I have noticed that when a wrong CSV record is ingested and deserialized > at CsvRowDeserializationSchema, when ignoreParseErrors is set to false, > then an IOException is thrown, which is expected, and in earlier Flink > versions, the Flink YARN app would terminated with FINISHED state and > FAILED final status for per-job mode Flink jobs without specifically > setting a RestartStrategy. > > However, to my surprise, in Flink 1.10, the YARN app switched to SUCCEEDED > final status afterwards, which is weird since that this Flink job > termination was caused by abnormal data input, instead of normal end of > source stream. > > Here I am confused because normally SUCCEEDED final status would give > users implications that the job has been **successfully** ended, however > this is definitely not the case. So I just wonder if this is the designed > behavior of Flink 1.10 on data deserialization errors. > > Thank you for reading : ) > > Sincerely, > Weike >