Hi all, Trying to migrate a job from Flink 1.14 to 1.15.1. The job itself consumes from a kinesis stream and writes to s3. The stop operation works well on 1.14, however, on 1.15.1 it fails (both with and without savepoint).
The jobs fails with different exceptions when there is data flowing thru the kinesis stream and when there is no data, but both are related to some kind of interruption. Is that a regression on 1.15 or I’ve misconfigured something? When there is no data flowing thru the stream: 2022-08-04 07:19:00,348 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Shutting down the shard consumer threads of subtask 0 ... 2022-08-04 07:19:00,350 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 ... java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) ~[?:?] at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.adjustRunLoopFrequency(PollingRecordPublisher.java:216) ~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-e4459909448cf1de8985bd75b506fe03:?] at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:124) ~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-e4459909448cf1de8985bd75b506fe03:?] at org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102) ~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-e4459909448cf1de8985bd75b506fe03:?] at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114) ~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-e4459909448cf1de8985bd75b506fe03:?] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] at java.lang.Thread.run(Unknown Source) [?:?] When there is data flowing thru the stream: Caused by: java.io.IOException: Interrupted while waiting for buffer at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:405) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:377) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:315) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:162) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:104) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist-1.15.1.jar:1.15.1] ... 32 more 2022-08-04 06:53:24,398 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 8 ... java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(Unknown Source) ~[?:?] at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown Source) ~[?:?] at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:870) ~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-9a850919e2c28ace0fbb911415e3a696:?] at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:760) ~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-9a850919e2c28ace0fbb911415e3a696:?] at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:392) ~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-9a850919e2c28ace0fbb911415e3a696:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332) ~[flink-dist-1.15.1.jar:1.15.1] 2022-08-04 06:53:24,422 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Shutting down the shard consumer threads of subtask 8 ...