Hello everyone, A little background on the job: relatively simple topology with a single Kinesis ingress stream that utilizes EFO, a few transform operations, and then sinking to DynamoDb.
Version: Flink 1.15 I’m running a Flink job on the AWS Managed Service, and am not sure if what I’m running into is a bug or user error. Anytime we attempt to shutdown the service or update (which I assume utilizes the “stop-with-savepoint” command) we encounter the following errors: Message: unable to shutdown event loop java.lang.RuntimeException: java.lang.InterruptedException at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.closeEventLoopUninterruptibly(NettyNioAsyncHttpClient.java:210) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.lambda$close$2(NettyNioAsyncHttpClient.java:199) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.runAndLogError(NettyUtils.java:377) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.close(NettyNioAsyncHttpClient.java:198) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.close(KinesisProxyV2.java:91) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory.close(FanOutRecordPublisherFactory.java:97) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.closeRecordPublisherFactory(KinesisDataFetcher.java:843) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:817) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:410) at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285) at //cutoff for brevity// along with this: Message: unable to close channel pools java.lang.RuntimeException: java.lang.InterruptedException at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.AwaitCloseChannelPoolMap.close(AwaitCloseChannelPoolMap.java:173) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.runAndLogError(NettyUtils.java:377) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient.close(NettyNioAsyncHttpClient.java:197) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2.close(KinesisProxyV2.java:91) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherFactory.close(FanOutRecordPublisherFactory.java:97) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.closeRecordPublisherFactory(KinesisDataFetcher.java:843) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:817) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:410) at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:385) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.AwaitCloseChannelPoolMap.close(AwaitCloseChannelPoolMap.java:170) ... 22 more I’ve done some digging in the logs and have found some things that lead me to believe that we are somehow losing our connection to the Kinesis stream before checkpointing is complete, causing the savepoint to fail “"Error occurred on EFO subscription: org.apache.flink.kinesis.shaded.io.netty.channel.StacklessClosedChannelException - (null).” An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.FutureCancelledException: org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkCancellationException: Subscriber cancelled before all events were published\n\tat org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.lambda$null$3(NettyRequestExecutor.java:136)\n\tat org.apache.flink.kinesis.shaded.io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)\n\tat org.apache.flink.kinesis.shaded.io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:106)\n\tat org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)\n\tat org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)\n\tat org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)\n\tat org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)\n\tat org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkCancellationException: Subscriber cancelled before all events were published\n\tat org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onCancel(ResponseHandler.java:311 …22 more There are a few more errors that are thrown, but I think they are all the result of the above errors. I can include them if you think they are helpful. I’m very new to Flink so not sure if these are the result of a bug or user error, but wanted to throw this out there and see if anyone had any pointers. TIA!