[ 
https://issues.apache.org/jira/browse/FLINK-37648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh updated FLINK-37648:
------------------------------------
    Fix Version/s: aws-connector-5.1.0

> Flink Kinesis connector lost track of 1 shard.
> ----------------------------------------------
>
>                 Key: FLINK-37648
>                 URL: https://issues.apache.org/jira/browse/FLINK-37648
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.20.1
>         Environment: Flink version: 1.20
> flink-connector-aws-kinesis-streams' version: 5.0.0-1.20
> kinesis stream's shard number: 2
> consumer type: EFO
> parallelism: 1
>            Reporter: roland
>            Priority: Major
>             Fix For: aws-connector-5.1.0
>
>
> Hi,
> I have a Flink 1.20 job which uses flink-connector-aws-kinesis-streams to 
> consume from a Kinesis stream with 2 shards using EFO.
> The job runs for 40 days without any issue and from one point, it failed to 
> subscribe to both of the shard and only subscribed to shard-0. 
>  
> 1. Before 2025-04-08 07:20 (UTC) , Flink job subscribed 2 shards:
>  
>  
> {code:java}
> 2025-04-08 07:12:57,788 INFO  
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  [] - Subscription complete - shardId-000000000001 
> 15:12:57.789
> se-content-platform
> 2025-04-08 07:12:57,788 INFO  
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  [] - Activating subscription to shard shardId-000000000001 with starting 
> position StartingPosition{shardIteratorType=AFTER_SEQUENCE_NUMBER, 
> startingMarker=49661185042803593150728679800204632415527945304203591698} for 
> consumer arn:xxx
> 15:12:57.841
> se-content-platform
> 2025-04-08 07:12:57,841 INFO  
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  [] - Successfully subscribed to shard shardId-000000000001 at 
> StartingPosition{shardIteratorType=AFTER_SEQUENCE_NUMBER, 
> startingMarker=49661185042803593150728679800204632415527945304203591698} 
> using consumer arn:xxx
> 15:12:57.841
> se-content-platform
> 2025-04-08 07:12:57,841 INFO  
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  [] - Successfully subscribed to shard shardId-000000000001 with starting 
> position StartingPosition{shardIteratorType=AFTER_SEQUENCE_NUMBER, 
> startingMarker=49661185042803593150728679800204632415527945304203591698} for 
> consumer arn:xxx
> 15:15:06.360
> se-content-platform
> 2025-04-08 07:15:06,360 INFO  
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  [] - Activating subscription to shard shardId-000000000000 with starting 
> position StartingPosition{shardIteratorType=AFTER_SEQUENCE_NUMBER, 
> startingMarker=49661066447418326619680247193290876689240616667852046338} for 
> consumer arn:xxx{code}
> As shown above, both shard-0 and shard-1 are subscribed and consumed.
> 2. After that timestamp, Flink job only subscribed 1 shard:
>  
> {code:java}
> 2025-04-08 07:20:06,432 INFO  
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  [] - Subscription complete - shardId-000000000000 (arn:
> 15:20:06.433
> se-content-platform
> 2025-04-08 07:20:06,433 INFO  
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  [] - Activating subscription to shard shardId-000000000000 with starting 
> position StartingPosition{shardIteratorType=AFTER_SEQUENCE_NUMBER, 
> startingMarker=49661066447418326619680247330352841488049219966007771138} for 
> consumer arn:
> 15:20:06.442
> se-content-platform
> 2025-04-08 07:20:06,442 INFO  
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  [] - Successfully subscribed to shard shardId-000000000000 at 
> StartingPosition{shardIteratorType=AFTER_SEQUENCE_NUMBER, 
> startingMarker=49661066447418326619680247330352841488049219966007771138} 
> using consumer arn:
> 15:20:06.442
> se-content-platform
> 2025-04-08 07:20:06,442 INFO  
> org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription
>  [] - Successfully subscribed to shard shardId-000000000000 with starting 
> position StartingPosition{shardIteratorType=AFTER_SEQUENCE_NUMBER, 
> startingMarker=49661066447418326619680247330352841488049219966007771138} for 
> consumer arn:. {code}
>  
>  
> There're no exception logs in between, however I found this warn log appeared 
> every 40 min:
>  
> {code:java}
> 2025-04-09 09:35:29,477 WARN  io.netty.channel.DefaultChannelPipeline         
>              [] - An exceptionCaught() event was fired, and it reached at the 
> tail of the pipeline. It usually means the last handler in the pipeline did 
> not handle the exception.
> java.io.IOException: An error occurred on the connection: 
> java.nio.channels.ClosedChannelException, [channel: 086eb48c]. All streams 
> will be closed
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.decorateConnectionException(MultiplexedChannelRecord.java:213)
>  
> ~[blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeChildChannels$10(MultiplexedChannelRecord.java:205)
>  
> ~[blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeAndExecuteOnChildChannels$11(MultiplexedChannelRecord.java:229)
>  
> ~[blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop(NettyUtils.java:248)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeAndExecuteOnChildChannels(MultiplexedChannelRecord.java:220)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeChildChannels(MultiplexedChannelRecord.java:205)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:353)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:333)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.access$200(Http2MultiplexedChannelPool.java:76)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.closeAndReleaseParent(Http2MultiplexedChannelPool.java:509)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.channelInactive(Http2MultiplexedChannelPool.java:486)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:206)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> software.amazon.awssdk.http.nio.netty.internal.http2.Http2PingHandler.channelInactive(Http2PingHandler.java:83)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.handler.codec.http2.Http2ConnectionHandler.channelInactive(Http2ConnectionHandler.java:428)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1172) 
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1402)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:900)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:811)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) 
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
>  
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
> [blob_p-30497883442822f5c82caef062655823d22e6214-d096e666a38e1d8371c3989c721c6564:?]
>       at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.nio.channels.ClosedChannelException
>       ... 41 more {code}
>  
>  
> My question is:
> 1. Why does the job stop subscribing shard-1 ?
> 2. Why does the Flink job keep running without exceptions ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to