Hi, It's hard to say from the log fragment, but I presume this task has correctly switched to "CANCELLED" state and this error should not have been logged as an ERROR, right? How did you get this stack trace? Maybe it was logged as a DEBUG message? If not, that would be probably a minor bug in Flink and can you post a larger fragment of the log including the stack trace and the log line that has printed it?
In short, this kind of exception is a normal thing to happen and expected when cancelling a job. If your code is busy and blocked while being backpressured (as your FlatMap operation was in this particular case), interrupting the code is a standard thing that Flink is doing. However it shouldn't bubble up to the end user exactly for this reason - to not confuse users. > some TM cancel success, some TM become cenceling and the TM will be kill by itself with task.cancellation.timeout = 180000 This part is a bit confusing to me. The above interruption should actually prevent this timeout from kicking in and TM shouldn't be killed. Again can you post larger part of the TM/JM logs or even better, full TM/JM logs? best, Piotrek sob., 26 cze 2021 o 04:59 SmileSmile <a511955...@163.com> napisaĆ(a): > Hi > > I use Flink 1.12.4 on yarn, job topology is. kafka -> source -> > flatmap -> window 1 min agg -> sink -> kafka. Checkpoint is enable , > checkpoint interval is 20s . When I cancel my job, some TM cancel > success, some TM become cenceling and the TM will be kill by itself with > task.cancellation.timeout = 180000. the TM log show that > > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:114) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:74) > [testFlink-1.0.jar:?] > at > com.operation.ParseLineOperationForAgg.flatMap(ParseLineOperationForAgg.java:29) > [testFlink-1.0.jar:?] > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) > [flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112) > [flink-dist_2.11-1.12.4.jar:1.12.4] > > Caused by: java.io.IOException: Interrupted while waiting for buffer > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341) > ~[testFlink-1.0.jar:?] > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313) > ~[testFlink-1.0.jar:?] > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257) > ~[testFlink-1.0.jar:?] > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149) > ~[testFlink-1.0.jar:?] > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) > ~[testFlink-1.0.jar:?] > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) > ~[testFlink-1.0.jar:?] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:72) > ~[testFlink-1.0.jar:?] > at > com.operation.ExtractLineOperationAgg.flatMap(ExtractLineOperationAgg.java:28) > ~[testFlink-1.0.jar:?] > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112) > ~[flink-dist_2.11-1.12.4.jar:1.12.4] > ... 32 more > > > My question : > > 1. what can I do to deal with this error ? > 2. if I cancel job with savepoint , will this error affect savepoint ? > > > Best ! > > > > >