[ 
https://issues.apache.org/jira/browse/FLINK-30032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haoze Wu updated FLINK-30032:
-----------------------------
    Description: 
We are doing testing on Flink (version 1.14.0). We launch 1 
StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a Flink 
client which submit a WordCount workload. The code is similar to 
[https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java],
 and we only add a Kafka topic output:
{code:java}
    private static DataStreamSink<String> addKafkaSink(
            final DataStream<String> events, final String brokers, final String 
topic) {
        return events.sinkTo(KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setValueSerializationSchema(new 
SimpleStringSchema())
                                .setTopic(topic)
                                .build())
                .build());
    }

    public static void run(final String[] args) throws Exception {
        final String brokers = args[0];
        final String textFilePath = args[1];
        final String kafkaTopic = args[2];
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        final DataStream<String> text = env.readTextFile(textFilePath);
        final DataStream<Tuple2<String, Integer>> counts =
                text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);
        addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic);
        final long nano = System.nanoTime();
        env.execute("WordCount");
        FlinkGrayClientMain.reply("success", nano);
    }
 {code}
We found that sometimes the Kafka topic fails to receive a few messages. We 
reproduce the symptom multiple times. We found that the Kafka topic always gets 
160~169 messages while the expected number of messages is 170. We also found 
that the missing messages are always the expected last few messages from the 
170 expected messages.

Then we inspect the logs and code.

First, we have an IOException to one of the TaskManagerRunner:
{code:java}
2021-11-02T17:43:41,070 WARN  source.ContinuousFileReaderOperator 
(ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark 
while closing
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 [flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: java.lang.RuntimeException: McGray injected exception
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        ... 24 more
Caused by: java.io.IOException {code}
The IOException is from line 104 in RecordWriter#emit:
{code:java}
    protected void emit(T record, int targetSubpartition) throws IOException {
        checkErroneous();

        targetPartition.emitRecord(serializeRecord(serializer, record), 
targetSubpartition); // line 104

        if (flushAlways) {
            targetPartition.flush(targetSubpartition);
        }
    } {code}
Here, `targetPartition.emitRecord` will finally call some file I/O or memory 
map I/O, triggering the IOException for some reason.

This exception is caught at `RecordWriterOutput#emitWatermark`:
{code:java}
    @Override
    public void emitWatermark(Watermark mark) {
        if (announcedStatus.isIdle()) {
            return;
        }

        watermarkGauge.setCurrentWatermark(mark.getTimestamp());
        serializationDelegate.setInstance(mark);

        try {
            recordWriter.broadcastEmit(serializationDelegate);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    } {code}
And then caught at `ChainingOutput#emitWatermark`:
{code:java}
    @Override
    public void emitWatermark(Watermark mark) {
        if (announcedStatus.isIdle()) {
            return;
        }
        try {
            watermarkGauge.setCurrentWatermark(mark.getTimestamp());
            input.processWatermark(mark);
        } catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    } {code}
And finally caught at `ContinuousFileReaderOperator#finish`:
{code:java}
    @Override
    public void finish() throws Exception {
        LOG.debug("finishing");
        super.finish();        switch (state) {
            case IDLE:
                switchState(ReaderState.FINISHED);
                break;
            case FINISHED:
                LOG.warn("operator is already closed, doing nothing");
                return;
            default:
                switchState(ReaderState.FINISHING);
                while (!state.isTerminal()) {
                    executor.yield();
                }
        }

        try {
            sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
        } catch (Exception e) {
            LOG.warn("unable to emit watermark while closing", e);
        }
    } {code}
Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation.

In Flink (version 1.14.0), the full call stack of all the aforementioned 
workflow is:
{code:java}
org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67
org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119
org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605
org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112
org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117
org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549
org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203
org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809
org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761
org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958
org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937
org.apache.flink.runtime.taskmanager.Task#doRun:766
org.apache.flink.runtime.taskmanager.Task#run:575
java.lang.Thread#run:748 {code}
We think the reason for missing a few ending messages in Kafka topic is in 
`ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack):
{code:java}
    @Override
    public void broadcastEmit(T record) throws IOException {
        checkErroneous();

        // Emitting to all channels in a for loop can be better than calling
        // ResultPartitionWriter#broadcastRecord because the broadcastRecord
        // method incurs extra overhead.
        ByteBuffer serializedRecord = serializeRecord(serializer, record);
        for (int channelIndex = 0; channelIndex < numberOfChannels; 
channelIndex++) {
            serializedRecord.rewind();
            emit(record, channelIndex);   // line 67
        }

        if (flushAlways) {
            flushAll();
        }
    } {code}
Line 67 tries to emit `Watermark.MAX_WATERMARK` (from 
`ContinuousFileReaderOperator#finish`) to all channels. When the IOException is 
thrown here, `ContinuousFileReaderOperator#finish` swallows all the exceptions 
and loop fails to continue running line 67 for the remaining channels. We 
reproduce the symptom multiple times and we found the number of missing 
messages is exactly equal to the number of affected channels.

That being said, we suspect the potential IOException at line 67 is not 
properly handled because the current symptom and logging is not convenient for 
the user to notice the issue or debug. The user may suddenly get a few ending 
messages missing. And then the user can only find that there is some 
IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users still 
don’t know why and how a few ending messages are missing.

We would like to propose a fix for this issue. A simple solution is catching 
the IOException at line 67 and then do some logging and maybe retry to emit. We 
implemented this solution and found the symptom disappears. However, we also 
found that this `broadcastEmit` method is called at many places. So, this fix 
will also affect the other callers, and we are not sure whether this behavior 
is also proper for those callers.

We are looking for suggestions and feedback. Thanks!

  was:
We are doing testing on Flink (version 1.14.0). We launch 1 
StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a Flink 
client which submit a WordCount workload. The code is similar to 
[https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java],
 and we only add a Kafka topic output:

 
{code:java}
    private static DataStreamSink<String> addKafkaSink(
            final DataStream<String> events, final String brokers, final String 
topic) {
        return events.sinkTo(KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setValueSerializationSchema(new 
SimpleStringSchema())
                                .setTopic(topic)
                                .build())
                .build());
    }

    public static void run(final String[] args) throws Exception {
        final String brokers = args[0];
        final String textFilePath = args[1];
        final String kafkaTopic = args[2];
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        final DataStream<String> text = env.readTextFile(textFilePath);
        final DataStream<Tuple2<String, Integer>> counts =
                text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);
        addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic);
        final long nano = System.nanoTime();
        env.execute("WordCount");
        FlinkGrayClientMain.reply("success", nano);
    }
 {code}
We found that sometimes the Kafka topic fails to receive a few messages. We 
reproduce the symptom multiple times. We found that the Kafka topic always gets 
160~169 messages while the expected number of messages is 170. We also found 
that the missing messages are always the expected last few messages from the 
170 expected messages.

Then we inspect the logs and code.

First, we have an IOException to one of the TaskManagerRunner:

 

 
{code:java}
2021-11-02T17:43:41,070 WARN  source.ContinuousFileReaderOperator 
(ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark 
while closing
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) 
~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
 [flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
[flink-dist_2.11-1.14.0.jar:1.14.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
[flink-dist_2.11-1.14.0.jar:1.14.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: java.lang.RuntimeException: McGray injected exception
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112)
 ~[flink-dist_2.11-1.14.0.jar:1.14.0]
        ... 24 more
Caused by: java.io.IOException {code}
The IOException is from line 104 in RecordWriter#emit:

 

 
{code:java}
    protected void emit(T record, int targetSubpartition) throws IOException {
        checkErroneous();

        targetPartition.emitRecord(serializeRecord(serializer, record), 
targetSubpartition); // line 104

        if (flushAlways) {
            targetPartition.flush(targetSubpartition);
        }
    } {code}
Here, `targetPartition.emitRecord` will finally call some file I/O or memory 
map I/O, triggering the IOException for some reason.

 

This exception is caught at `RecordWriterOutput#emitWatermark`:

 
{code:java}
    @Override
    public void emitWatermark(Watermark mark) {
        if (announcedStatus.isIdle()) {
            return;
        }

        watermarkGauge.setCurrentWatermark(mark.getTimestamp());
        serializationDelegate.setInstance(mark);

        try {
            recordWriter.broadcastEmit(serializationDelegate);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    } {code}
And then caught at `ChainingOutput#emitWatermark`:

 

 
{code:java}
    @Override
    public void emitWatermark(Watermark mark) {
        if (announcedStatus.isIdle()) {
            return;
        }
        try {
            watermarkGauge.setCurrentWatermark(mark.getTimestamp());
            input.processWatermark(mark);
        } catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    } {code}
And finally caught at `ContinuousFileReaderOperator#finish`:

 

 
{code:java}
    @Override
    public void finish() throws Exception {
        LOG.debug("finishing");
        super.finish();        switch (state) {
            case IDLE:
                switchState(ReaderState.FINISHED);
                break;
            case FINISHED:
                LOG.warn("operator is already closed, doing nothing");
                return;
            default:
                switchState(ReaderState.FINISHING);
                while (!state.isTerminal()) {
                    executor.yield();
                }
        }

        try {
            sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
        } catch (Exception e) {
            LOG.warn("unable to emit watermark while closing", e);
        }
    } {code}
Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation.

In Flink (version 1.14.0), the full call stack of all the aforementioned 
workflow is:

 

 
{code:java}
org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67
org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119
org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605
org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112
org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117
org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549
org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203
org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809
org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761
org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958
org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937
org.apache.flink.runtime.taskmanager.Task#doRun:766
org.apache.flink.runtime.taskmanager.Task#run:575
java.lang.Thread#run:748 {code}
We think the reason for missing a few ending messages in Kafka topic is in 
`ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack):

 

 
{code:java}
    @Override
    public void broadcastEmit(T record) throws IOException {
        checkErroneous();

        // Emitting to all channels in a for loop can be better than calling
        // ResultPartitionWriter#broadcastRecord because the broadcastRecord
        // method incurs extra overhead.
        ByteBuffer serializedRecord = serializeRecord(serializer, record);
        for (int channelIndex = 0; channelIndex < numberOfChannels; 
channelIndex++) {
            serializedRecord.rewind();
            emit(record, channelIndex);   // line 67
        }

        if (flushAlways) {
            flushAll();
        }
    } {code}
Line 67 tries to emit `Watermark.MAX_WATERMARK` (from 
`ContinuousFileReaderOperator#finish`) to all channels. When the IOException is 
thrown here, `ContinuousFileReaderOperator#finish` swallows all the exceptions 
and loop fails to continue running line 67 for the remaining channels. We 
reproduce the symptom multiple times and we found the number of missing 
messages is exactly equal to the number of affected channels.

That being said, we suspect the potential IOException at line 67 is not 
properly handled because the current symptom and logging is not convenient for 
the user to notice the issue or debug. The user may suddenly get a few ending 
messages missing. And then the user can only find that there is some 
IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users still 
don’t know why and how a few ending messages are missing.

We would like to propose a fix for this issue. A simple solution is catching 
the IOException at line 67 and then do some logging and maybe retry to emit. We 
implemented this solution and found the symptom disappears. However, we also 
found that this `broadcastEmit` method is called at many places. So, this fix 
will also affect the other callers, and we are not sure whether this behavior 
is also proper for those callers.

We are looking for suggestions and feedback. Thanks!

 

 


> IOException during MAX_WATERMARK emission causes message missing
> ----------------------------------------------------------------
>
>                 Key: FLINK-30032
>                 URL: https://issues.apache.org/jira/browse/FLINK-30032
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.0
>            Reporter: Haoze Wu
>            Priority: Major
>
> We are doing testing on Flink (version 1.14.0). We launch 1 
> StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a 
> Flink client which submit a WordCount workload. The code is similar to 
> [https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java],
>  and we only add a Kafka topic output:
> {code:java}
>     private static DataStreamSink<String> addKafkaSink(
>             final DataStream<String> events, final String brokers, final 
> String topic) {
>         return events.sinkTo(KafkaSink.<String>builder()
>                 .setBootstrapServers(brokers)
>                 .setRecordSerializer(
>                         KafkaRecordSerializationSchema.<String>builder()
>                                 .setValueSerializationSchema(new 
> SimpleStringSchema())
>                                 .setTopic(topic)
>                                 .build())
>                 .build());
>     }
>     public static void run(final String[] args) throws Exception {
>         final String brokers = args[0];
>         final String textFilePath = args[1];
>         final String kafkaTopic = args[2];
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>         final DataStream<String> text = env.readTextFile(textFilePath);
>         final DataStream<Tuple2<String, Integer>> counts =
>                 text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);
>         addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic);
>         final long nano = System.nanoTime();
>         env.execute("WordCount");
>         FlinkGrayClientMain.reply("success", nano);
>     }
>  {code}
> We found that sometimes the Kafka topic fails to receive a few messages. We 
> reproduce the symptom multiple times. We found that the Kafka topic always 
> gets 160~169 messages while the expected number of messages is 170. We also 
> found that the missing messages are always the expected last few messages 
> from the 170 expected messages.
> Then we inspect the logs and code.
> First, we have an IOException to one of the TaskManagerRunner:
> {code:java}
> 2021-11-02T17:43:41,070 WARN  source.ContinuousFileReaderOperator 
> (ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark 
> while closing
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>         at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> Caused by: java.lang.RuntimeException: McGray injected exception
>         at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         ... 24 more
> Caused by: java.io.IOException {code}
> The IOException is from line 104 in RecordWriter#emit:
> {code:java}
>     protected void emit(T record, int targetSubpartition) throws IOException {
>         checkErroneous();
>         targetPartition.emitRecord(serializeRecord(serializer, record), 
> targetSubpartition); // line 104
>         if (flushAlways) {
>             targetPartition.flush(targetSubpartition);
>         }
>     } {code}
> Here, `targetPartition.emitRecord` will finally call some file I/O or memory 
> map I/O, triggering the IOException for some reason.
> This exception is caught at `RecordWriterOutput#emitWatermark`:
> {code:java}
>     @Override
>     public void emitWatermark(Watermark mark) {
>         if (announcedStatus.isIdle()) {
>             return;
>         }
>         watermarkGauge.setCurrentWatermark(mark.getTimestamp());
>         serializationDelegate.setInstance(mark);
>         try {
>             recordWriter.broadcastEmit(serializationDelegate);
>         } catch (Exception e) {
>             throw new RuntimeException(e.getMessage(), e);
>         }
>     } {code}
> And then caught at `ChainingOutput#emitWatermark`:
> {code:java}
>     @Override
>     public void emitWatermark(Watermark mark) {
>         if (announcedStatus.isIdle()) {
>             return;
>         }
>         try {
>             watermarkGauge.setCurrentWatermark(mark.getTimestamp());
>             input.processWatermark(mark);
>         } catch (Exception e) {
>             throw new ExceptionInChainedOperatorException(e);
>         }
>     } {code}
> And finally caught at `ContinuousFileReaderOperator#finish`:
> {code:java}
>     @Override
>     public void finish() throws Exception {
>         LOG.debug("finishing");
>         super.finish();        switch (state) {
>             case IDLE:
>                 switchState(ReaderState.FINISHED);
>                 break;
>             case FINISHED:
>                 LOG.warn("operator is already closed, doing nothing");
>                 return;
>             default:
>                 switchState(ReaderState.FINISHING);
>                 while (!state.isTerminal()) {
>                     executor.yield();
>                 }
>         }
>         try {
>             sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
>         } catch (Exception e) {
>             LOG.warn("unable to emit watermark while closing", e);
>         }
>     } {code}
> Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation.
> In Flink (version 1.14.0), the full call stack of all the aforementioned 
> workflow is:
> {code:java}
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67
> org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119
> org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
> org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605
> org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112
> org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117
> org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549
> org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203
> org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809
> org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761
> org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958
> org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937
> org.apache.flink.runtime.taskmanager.Task#doRun:766
> org.apache.flink.runtime.taskmanager.Task#run:575
> java.lang.Thread#run:748 {code}
> We think the reason for missing a few ending messages in Kafka topic is in 
> `ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack):
> {code:java}
>     @Override
>     public void broadcastEmit(T record) throws IOException {
>         checkErroneous();
>         // Emitting to all channels in a for loop can be better than calling
>         // ResultPartitionWriter#broadcastRecord because the broadcastRecord
>         // method incurs extra overhead.
>         ByteBuffer serializedRecord = serializeRecord(serializer, record);
>         for (int channelIndex = 0; channelIndex < numberOfChannels; 
> channelIndex++) {
>             serializedRecord.rewind();
>             emit(record, channelIndex);   // line 67
>         }
>         if (flushAlways) {
>             flushAll();
>         }
>     } {code}
> Line 67 tries to emit `Watermark.MAX_WATERMARK` (from 
> `ContinuousFileReaderOperator#finish`) to all channels. When the IOException 
> is thrown here, `ContinuousFileReaderOperator#finish` swallows all the 
> exceptions and loop fails to continue running line 67 for the remaining 
> channels. We reproduce the symptom multiple times and we found the number of 
> missing messages is exactly equal to the number of affected channels.
> That being said, we suspect the potential IOException at line 67 is not 
> properly handled because the current symptom and logging is not convenient 
> for the user to notice the issue or debug. The user may suddenly get a few 
> ending messages missing. And then the user can only find that there is some 
> IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users 
> still don’t know why and how a few ending messages are missing.
> We would like to propose a fix for this issue. A simple solution is catching 
> the IOException at line 67 and then do some logging and maybe retry to emit. 
> We implemented this solution and found the symptom disappears. However, we 
> also found that this `broadcastEmit` method is called at many places. So, 
> this fix will also affect the other callers, and we are not sure whether this 
> behavior is also proper for those callers.
> We are looking for suggestions and feedback. Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to