[ https://issues.apache.org/jira/browse/FLINK-30032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Haoze Wu updated FLINK-30032: ----------------------------- Description: We are doing testing on Flink (version 1.14.0). We launch 1 StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a Flink client which submit a WordCount workload. The code is similar to [https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java], and we only add a Kafka topic output: {code:java} private static DataStreamSink<String> addKafkaSink( final DataStream<String> events, final String brokers, final String topic) { return events.sinkTo(KafkaSink.<String>builder() .setBootstrapServers(brokers) .setRecordSerializer( KafkaRecordSerializationSchema.<String>builder() .setValueSerializationSchema(new SimpleStringSchema()) .setTopic(topic) .build()) .build()); } public static void run(final String[] args) throws Exception { final String brokers = args[0]; final String textFilePath = args[1]; final String kafkaTopic = args[2]; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final DataStream<String> text = env.readTextFile(textFilePath); final DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1); addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic); final long nano = System.nanoTime(); env.execute("WordCount"); FlinkGrayClientMain.reply("success", nano); } {code} We found that sometimes the Kafka topic fails to receive a few messages. We reproduce the symptom multiple times. We found that the Kafka topic always gets 160~169 messages while the expected number of messages is 170. We also found that the missing messages are always the expected last few messages from the 170 expected messages. Then we inspect the logs and code. First, we have an IOException to one of the TaskManagerRunner: {code:java} 2021-11-02T17:43:41,070 WARN source.ContinuousFileReaderOperator (ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark while closing org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.11-1.14.0.jar:1.14.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] Caused by: java.lang.RuntimeException: McGray injected exception at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112) ~[flink-dist_2.11-1.14.0.jar:1.14.0] ... 24 more Caused by: java.io.IOException {code} The IOException is from line 104 in RecordWriter#emit: {code:java} protected void emit(T record, int targetSubpartition) throws IOException { checkErroneous(); targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition); // line 104 if (flushAlways) { targetPartition.flush(targetSubpartition); } } {code} Here, `targetPartition.emitRecord` will finally call some file I/O or memory map I/O, triggering the IOException for some reason. This exception is caught at `RecordWriterOutput#emitWatermark`: {code:java} @Override public void emitWatermark(Watermark mark) { if (announcedStatus.isIdle()) { return; } watermarkGauge.setCurrentWatermark(mark.getTimestamp()); serializationDelegate.setInstance(mark); try { recordWriter.broadcastEmit(serializationDelegate); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } {code} And then caught at `ChainingOutput#emitWatermark`: {code:java} @Override public void emitWatermark(Watermark mark) { if (announcedStatus.isIdle()) { return; } try { watermarkGauge.setCurrentWatermark(mark.getTimestamp()); input.processWatermark(mark); } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } {code} And finally caught at `ContinuousFileReaderOperator#finish`: {code:java} @Override public void finish() throws Exception { LOG.debug("finishing"); super.finish(); switch (state) { case IDLE: switchState(ReaderState.FINISHED); break; case FINISHED: LOG.warn("operator is already closed, doing nothing"); return; default: switchState(ReaderState.FINISHING); while (!state.isTerminal()) { executor.yield(); } } try { sourceContext.emitWatermark(Watermark.MAX_WATERMARK); } catch (Exception e) { LOG.warn("unable to emit watermark while closing", e); } } {code} Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation. In Flink (version 1.14.0), the full call stack of all the aforementioned workflow is: {code:java} org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104 org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67 org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119 org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40 org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605 org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112 org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40 org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428 org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544 org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113 org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459 org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50 org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211 org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185 org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50 org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97 org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162 org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130 org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117 org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549 org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203 org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809 org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761 org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958 org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937 org.apache.flink.runtime.taskmanager.Task#doRun:766 org.apache.flink.runtime.taskmanager.Task#run:575 java.lang.Thread#run:748 {code} We think the reason for missing a few ending messages in Kafka topic is in `ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack): {code:java} @Override public void broadcastEmit(T record) throws IOException { checkErroneous(); // Emitting to all channels in a for loop can be better than calling // ResultPartitionWriter#broadcastRecord because the broadcastRecord // method incurs extra overhead. ByteBuffer serializedRecord = serializeRecord(serializer, record); for (int channelIndex = 0; channelIndex < numberOfChannels; channelIndex++) { serializedRecord.rewind(); emit(record, channelIndex); // line 67 } if (flushAlways) { flushAll(); } } {code} Line 67 tries to emit `Watermark.MAX_WATERMARK` (from `ContinuousFileReaderOperator#finish`) to all channels. When the IOException is thrown here, `ContinuousFileReaderOperator#finish` swallows all the exceptions and loop fails to continue running line 67 for the remaining channels. We reproduce the symptom multiple times and we found the number of missing messages is exactly equal to the number of affected channels. That being said, we suspect the potential IOException at line 67 is not properly handled because the current symptom and logging is not convenient for the user to notice the issue or debug. The user may suddenly get a few ending messages missing. And then the user can only find that there is some IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users still don’t know why and how a few ending messages are missing. We would like to propose a fix for this issue. A simple solution is catching the IOException at line 67 and then do some logging and maybe retry to emit. We implemented this solution and found the symptom disappears. However, we also found that this `broadcastEmit` method is called at many places. So, this fix will also affect the other callers, and we are not sure whether this behavior is also proper for those callers. We are looking for suggestions and feedback. Thanks! was: We are doing testing on Flink (version 1.14.0). We launch 1 StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a Flink client which submit a WordCount workload. The code is similar to [https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java], and we only add a Kafka topic output: {code:java} private static DataStreamSink<String> addKafkaSink( final DataStream<String> events, final String brokers, final String topic) { return events.sinkTo(KafkaSink.<String>builder() .setBootstrapServers(brokers) .setRecordSerializer( KafkaRecordSerializationSchema.<String>builder() .setValueSerializationSchema(new SimpleStringSchema()) .setTopic(topic) .build()) .build()); } public static void run(final String[] args) throws Exception { final String brokers = args[0]; final String textFilePath = args[1]; final String kafkaTopic = args[2]; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); final DataStream<String> text = env.readTextFile(textFilePath); final DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1); addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic); final long nano = System.nanoTime(); env.execute("WordCount"); FlinkGrayClientMain.reply("success", nano); } {code} We found that sometimes the Kafka topic fails to receive a few messages. We reproduce the symptom multiple times. We found that the Kafka topic always gets 160~169 messages while the expected number of messages is 170. We also found that the missing messages are always the expected last few messages from the 170 expected messages. Then we inspect the logs and code. First, we have an IOException to one of the TaskManagerRunner: {code:java} 2021-11-02T17:43:41,070 WARN source.ContinuousFileReaderOperator (ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark while closing org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) [flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist_2.11-1.14.0.jar:1.14.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] Caused by: java.lang.RuntimeException: McGray injected exception at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605) ~[flink-dist_2.11-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112) ~[flink-dist_2.11-1.14.0.jar:1.14.0] ... 24 more Caused by: java.io.IOException {code} The IOException is from line 104 in RecordWriter#emit: {code:java} protected void emit(T record, int targetSubpartition) throws IOException { checkErroneous(); targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition); // line 104 if (flushAlways) { targetPartition.flush(targetSubpartition); } } {code} Here, `targetPartition.emitRecord` will finally call some file I/O or memory map I/O, triggering the IOException for some reason. This exception is caught at `RecordWriterOutput#emitWatermark`: {code:java} @Override public void emitWatermark(Watermark mark) { if (announcedStatus.isIdle()) { return; } watermarkGauge.setCurrentWatermark(mark.getTimestamp()); serializationDelegate.setInstance(mark); try { recordWriter.broadcastEmit(serializationDelegate); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } {code} And then caught at `ChainingOutput#emitWatermark`: {code:java} @Override public void emitWatermark(Watermark mark) { if (announcedStatus.isIdle()) { return; } try { watermarkGauge.setCurrentWatermark(mark.getTimestamp()); input.processWatermark(mark); } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } } {code} And finally caught at `ContinuousFileReaderOperator#finish`: {code:java} @Override public void finish() throws Exception { LOG.debug("finishing"); super.finish(); switch (state) { case IDLE: switchState(ReaderState.FINISHED); break; case FINISHED: LOG.warn("operator is already closed, doing nothing"); return; default: switchState(ReaderState.FINISHING); while (!state.isTerminal()) { executor.yield(); } } try { sourceContext.emitWatermark(Watermark.MAX_WATERMARK); } catch (Exception e) { LOG.warn("unable to emit watermark while closing", e); } } {code} Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation. In Flink (version 1.14.0), the full call stack of all the aforementioned workflow is: {code:java} org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104 org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67 org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119 org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40 org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605 org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112 org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40 org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428 org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544 org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113 org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459 org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50 org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211 org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185 org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50 org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97 org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162 org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130 org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117 org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549 org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508 org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203 org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809 org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761 org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958 org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937 org.apache.flink.runtime.taskmanager.Task#doRun:766 org.apache.flink.runtime.taskmanager.Task#run:575 java.lang.Thread#run:748 {code} We think the reason for missing a few ending messages in Kafka topic is in `ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack): {code:java} @Override public void broadcastEmit(T record) throws IOException { checkErroneous(); // Emitting to all channels in a for loop can be better than calling // ResultPartitionWriter#broadcastRecord because the broadcastRecord // method incurs extra overhead. ByteBuffer serializedRecord = serializeRecord(serializer, record); for (int channelIndex = 0; channelIndex < numberOfChannels; channelIndex++) { serializedRecord.rewind(); emit(record, channelIndex); // line 67 } if (flushAlways) { flushAll(); } } {code} Line 67 tries to emit `Watermark.MAX_WATERMARK` (from `ContinuousFileReaderOperator#finish`) to all channels. When the IOException is thrown here, `ContinuousFileReaderOperator#finish` swallows all the exceptions and loop fails to continue running line 67 for the remaining channels. We reproduce the symptom multiple times and we found the number of missing messages is exactly equal to the number of affected channels. That being said, we suspect the potential IOException at line 67 is not properly handled because the current symptom and logging is not convenient for the user to notice the issue or debug. The user may suddenly get a few ending messages missing. And then the user can only find that there is some IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users still don’t know why and how a few ending messages are missing. We would like to propose a fix for this issue. A simple solution is catching the IOException at line 67 and then do some logging and maybe retry to emit. We implemented this solution and found the symptom disappears. However, we also found that this `broadcastEmit` method is called at many places. So, this fix will also affect the other callers, and we are not sure whether this behavior is also proper for those callers. We are looking for suggestions and feedback. Thanks! > IOException during MAX_WATERMARK emission causes message missing > ---------------------------------------------------------------- > > Key: FLINK-30032 > URL: https://issues.apache.org/jira/browse/FLINK-30032 > Project: Flink > Issue Type: Bug > Affects Versions: 1.14.0 > Reporter: Haoze Wu > Priority: Major > > We are doing testing on Flink (version 1.14.0). We launch 1 > StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a > Flink client which submit a WordCount workload. The code is similar to > [https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java], > and we only add a Kafka topic output: > {code:java} > private static DataStreamSink<String> addKafkaSink( > final DataStream<String> events, final String brokers, final > String topic) { > return events.sinkTo(KafkaSink.<String>builder() > .setBootstrapServers(brokers) > .setRecordSerializer( > KafkaRecordSerializationSchema.<String>builder() > .setValueSerializationSchema(new > SimpleStringSchema()) > .setTopic(topic) > .build()) > .build()); > } > public static void run(final String[] args) throws Exception { > final String brokers = args[0]; > final String textFilePath = args[1]; > final String kafkaTopic = args[2]; > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > final DataStream<String> text = env.readTextFile(textFilePath); > final DataStream<Tuple2<String, Integer>> counts = > text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1); > addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic); > final long nano = System.nanoTime(); > env.execute("WordCount"); > FlinkGrayClientMain.reply("success", nano); > } > {code} > We found that sometimes the Kafka topic fails to receive a few messages. We > reproduce the symptom multiple times. We found that the Kafka topic always > gets 160~169 messages while the expected number of messages is 170. We also > found that the missing messages are always the expected last few messages > from the 170 expected messages. > Then we inspect the logs and code. > First, we have an IOException to one of the TaskManagerRunner: > {code:java} > 2021-11-02T17:43:41,070 WARN source.ContinuousFileReaderOperator > (ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark > while closing > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > [flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > [flink-dist_2.11-1.14.0.jar:1.14.0] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > [flink-dist_2.11-1.14.0.jar:1.14.0] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > [flink-dist_2.11-1.14.0.jar:1.14.0] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] > Caused by: java.lang.RuntimeException: McGray injected exception > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > ... 24 more > Caused by: java.io.IOException {code} > The IOException is from line 104 in RecordWriter#emit: > {code:java} > protected void emit(T record, int targetSubpartition) throws IOException { > checkErroneous(); > targetPartition.emitRecord(serializeRecord(serializer, record), > targetSubpartition); // line 104 > if (flushAlways) { > targetPartition.flush(targetSubpartition); > } > } {code} > Here, `targetPartition.emitRecord` will finally call some file I/O or memory > map I/O, triggering the IOException for some reason. > This exception is caught at `RecordWriterOutput#emitWatermark`: > {code:java} > @Override > public void emitWatermark(Watermark mark) { > if (announcedStatus.isIdle()) { > return; > } > watermarkGauge.setCurrentWatermark(mark.getTimestamp()); > serializationDelegate.setInstance(mark); > try { > recordWriter.broadcastEmit(serializationDelegate); > } catch (Exception e) { > throw new RuntimeException(e.getMessage(), e); > } > } {code} > And then caught at `ChainingOutput#emitWatermark`: > {code:java} > @Override > public void emitWatermark(Watermark mark) { > if (announcedStatus.isIdle()) { > return; > } > try { > watermarkGauge.setCurrentWatermark(mark.getTimestamp()); > input.processWatermark(mark); > } catch (Exception e) { > throw new ExceptionInChainedOperatorException(e); > } > } {code} > And finally caught at `ContinuousFileReaderOperator#finish`: > {code:java} > @Override > public void finish() throws Exception { > LOG.debug("finishing"); > super.finish(); switch (state) { > case IDLE: > switchState(ReaderState.FINISHED); > break; > case FINISHED: > LOG.warn("operator is already closed, doing nothing"); > return; > default: > switchState(ReaderState.FINISHING); > while (!state.isTerminal()) { > executor.yield(); > } > } > try { > sourceContext.emitWatermark(Watermark.MAX_WATERMARK); > } catch (Exception e) { > LOG.warn("unable to emit watermark while closing", e); > } > } {code} > Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation. > In Flink (version 1.14.0), the full call stack of all the aforementioned > workflow is: > {code:java} > org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104 > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67 > org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119 > org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40 > org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605 > org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112 > org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40 > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428 > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544 > org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113 > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459 > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50 > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211 > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185 > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50 > org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90 > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97 > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162 > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130 > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117 > org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549 > org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508 > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203 > org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809 > org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761 > org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958 > org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937 > org.apache.flink.runtime.taskmanager.Task#doRun:766 > org.apache.flink.runtime.taskmanager.Task#run:575 > java.lang.Thread#run:748 {code} > We think the reason for missing a few ending messages in Kafka topic is in > `ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack): > {code:java} > @Override > public void broadcastEmit(T record) throws IOException { > checkErroneous(); > // Emitting to all channels in a for loop can be better than calling > // ResultPartitionWriter#broadcastRecord because the broadcastRecord > // method incurs extra overhead. > ByteBuffer serializedRecord = serializeRecord(serializer, record); > for (int channelIndex = 0; channelIndex < numberOfChannels; > channelIndex++) { > serializedRecord.rewind(); > emit(record, channelIndex); // line 67 > } > if (flushAlways) { > flushAll(); > } > } {code} > Line 67 tries to emit `Watermark.MAX_WATERMARK` (from > `ContinuousFileReaderOperator#finish`) to all channels. When the IOException > is thrown here, `ContinuousFileReaderOperator#finish` swallows all the > exceptions and loop fails to continue running line 67 for the remaining > channels. We reproduce the symptom multiple times and we found the number of > missing messages is exactly equal to the number of affected channels. > That being said, we suspect the potential IOException at line 67 is not > properly handled because the current symptom and logging is not convenient > for the user to notice the issue or debug. The user may suddenly get a few > ending messages missing. And then the user can only find that there is some > IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users > still don’t know why and how a few ending messages are missing. > We would like to propose a fix for this issue. A simple solution is catching > the IOException at line 67 and then do some logging and maybe retry to emit. > We implemented this solution and found the symptom disappears. However, we > also found that this `broadcastEmit` method is called at many places. So, > this fix will also affect the other callers, and we are not sure whether this > behavior is also proper for those callers. > We are looking for suggestions and feedback. Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)