Hello everyone,
A little background on the job: relatively simple topology with a single 
Kinesis ingress stream that utilizes EFO, a few transform operations, and then 
sinking to DynamoDb.

Version: Flink 1.15

I’m running a Flink job on the AWS Managed Service, and am not sure if what I’m 
running into is a bug or user error. Anytime we attempt to shutdown the service 
or update (which I assume utilizes the “stop-with-savepoint” command) we 
encounter the following errors:

Message: unable to shutdown event loop
java.lang.RuntimeException: java.lang.InterruptedException
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.closeEventLoopUninterruptibly(NettyNioAsyncHttpClient.java:210)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.lambda$close$2(NettyNioAsyncHttpClient.java:199)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.runAndLogError(NettyUtils.java:377)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.close(NettyNioAsyncHttpClient.java:198)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.close(KinesisProxyV2.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory.close(FanOutRecordPublisherFactory.java:97)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.closeRecordPublisherFactory(KinesisDataFetcher.java:843)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:817)
at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:410)
at 
org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
at //cutoff for brevity//

along with this:
Message: unable to close channel pools
java.lang.RuntimeException: java.lang.InterruptedException
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.AwaitCloseChannelPoolMap.close(AwaitCloseChannelPoolMap.java:173)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.runAndLogError(NettyUtils.java:377)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.close(NettyNioAsyncHttpClient.java:197)
at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.close(KinesisProxyV2.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory.close(FanOutRecordPublisherFactory.java:97)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.closeRecordPublisherFactory(KinesisDataFetcher.java:843)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:817)
at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:410)
at 
org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.InterruptedException
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:385)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
at 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.AwaitCloseChannelPoolMap.close(AwaitCloseChannelPoolMap.java:170)
... 22 more

I’ve done some digging in the logs and have found some things that lead me to 
believe that we are somehow losing our connection to the Kinesis stream before 
checkpointing is complete, causing the savepoint to fail
“"Error occurred on EFO subscription: 
org.apache.flink.kinesis.shaded.io.netty.channel.StacklessClosedChannelException
 - (null).”

An exceptionCaught() event was fired, and it reached at the tail of the 
pipeline. It usually means the last handler in the pipeline did not handle the 
exception.
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.FutureCancelledException:
 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkCancellationException:
 Subscriber cancelled before all events were published\n\tat 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.lambda$null$3(NettyRequestExecutor.java:136)\n\tat
 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)\n\tat
 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:106)\n\tat
 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)\n\tat
 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)\n\tat
 
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)\n\tat
 
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)\n\tat
 
org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat
 java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkCancellationException:
 Subscriber cancelled before all events were published\n\tat 
org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onCancel(ResponseHandler.java:311
…22 more

There are a few more errors that are thrown, but I think they are all the 
result of the above errors. I can include them if you think they are helpful.

I’m very new to Flink so not sure if these are the result of a bug or user 
error, but wanted to throw this out there and see if anyone had any pointers. 
TIA!



Reply via email to