Hi all,

Trying to migrate a job from Flink 1.14 to 1.15.1. The job itself consumes from 
a kinesis stream and writes to s3.
The stop operation works well on 1.14, however, on 1.15.1 it fails (both with 
and without savepoint).

The jobs fails with different exceptions when there is data flowing thru the 
kinesis stream and when there is no data, but both are related to some kind of 
interruption.

Is that a regression on 1.15 or I’ve misconfigured something?

When there is no data flowing thru the stream:

2022-08-04 07:19:00,348 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Shutting down the shard consumer threads of subtask 0 ...
2022-08-04 07:19:00,350 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 
...
java.lang.InterruptedException: sleep interrupted
       at java.lang.Thread.sleep(Native Method) ~[?:?]
       at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.adjustRunLoopFrequency(PollingRecordPublisher.java:216)
 
~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-e4459909448cf1de8985bd75b506fe03:?]
       at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:124)
 
~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-e4459909448cf1de8985bd75b506fe03:?]
       at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)
 
~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-e4459909448cf1de8985bd75b506fe03:?]
       at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
 
~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-e4459909448cf1de8985bd75b506fe03:?]
       at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
       at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
       at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
~[?:?]
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
~[?:?]
       at java.lang.Thread.run(Unknown Source) [?:?]


When there is data flowing thru the stream:

Caused by: java.io.IOException: Interrupted while waiting for buffer
       at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:405)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:377)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:315)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:162)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:106)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:104)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:90)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
 ~[flink-dist-1.15.1.jar:1.15.1]
       ... 32 more
2022-08-04 06:53:24,398 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 8 
...
java.lang.InterruptedException: null
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(Unknown
 Source) ~[?:?]
       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
 Source) ~[?:?]
       at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
Source) ~[?:?]
       at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:870)
 
~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-9a850919e2c28ace0fbb911415e3a696:?]
       at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:760)
 
~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-9a850919e2c28ace0fbb911415e3a696:?]
       at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:392)
 
~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-9a850919e2c28ace0fbb911415e3a696:?]
       at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
 ~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) 
~[flink-dist-1.15.1.jar:1.15.1]
       at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
 ~[flink-dist-1.15.1.jar:1.15.1]
2022-08-04 06:53:24,422 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Shutting down the shard consumer threads of subtask 8 ...

Reply via email to