Hi Flink users,

I used flink-1.12.5 kinesis connector to consume data from kinesis.

>From time to time I am getting IOException or
StacklessClosedChannelException, which will fail the Flink operator when it
by default reaches 10 times and trigger the entire job to restart.
I have two questions:

   1. I am wondering if we can have a better way to handle the error
   without restarting the Flink job. Restarting the job is time-consuming and
   will often slow down the data consumption.
   2. These two errors are treated as retryable exceptions, instead of
   recoverable exceptions. Are they not recoverable if we don't restart the
   Flink source operator?


Followed are the exceptions:

org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$RetryableFanOutSubscriberException:
org.apache.flink.kinesis.shaded.io.netty.channel.StacklessClosedChannelException
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.handleError(FanOutShardSubscriber.java:296)
~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:363)
~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)
~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)
[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]

or

org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber$RetryableFanOutSubscriberException:
java.io.IOException: An error occurred on the connection: null at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.handleError(FanOutShardSubscriber.java:296)
~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:363)
~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)
~[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)
[blob_p-6581ced9ade704dee24c2632e701b413d2e656ba-f7515b4b21d25cb44b07411a9ad294e8:?]



Thanks

Leon

Reply via email to