[ https://issues.apache.org/jira/browse/FLINK-30304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Danny Cranmer updated FLINK-30304: ---------------------------------- Attachment: sink-deadlock.png > Possible Deadlock in Kinesis/Firehose/DynamoDB Connector > -------------------------------------------------------- > > Key: FLINK-30304 > URL: https://issues.apache.org/jira/browse/FLINK-30304 > Project: Flink > Issue Type: Bug > Components: Connectors / DynamoDB, Connectors / Firehose, Connectors > / Kinesis > Affects Versions: 1.16.0, 1.15.3, aws-connector-3.0.0 > Reporter: Danny Cranmer > Assignee: Danny Cranmer > Priority: Critical > Fix For: 1.17.0, 1.16.1, 1.15.4, aws-connector-4.0.0, > aws-connector-3.1.0 > > Attachments: sink-deadlock.png > > > AWS Sinks based on Async Sink can enter a deadlock situation if the AWS async > client throws error outside of the future. We observed this with a local > application: > {code:java} > java.lang.NullPointerException > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.closedChannelMessage(NettyUtils.java:135) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.decorateException(NettyUtils.java:71) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:310) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:189) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool.lambda$acquire$1(CancellableAcquireChannelPool.java:58) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.ensureAcquiredChannelIsHealthy(HealthCheckedChannelPool.java:114) > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.lambda$tryAcquire$1(HealthCheckedChannelPool.java:97) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) > at > org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) > at > org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.base/java.lang.Thread.run(Thread.java:829){code} > Related AWS SDK issues that can cause this: > * [https://github.com/aws/aws-sdk-java-v2/issues/3435] > * [https://github.com/aws/aws-sdk-java-v2/issues/1812] > If an error is thrown and not handled by the future then the AsyncSink will > never decrement {{inFlightRequestCount}}. the job will hang while trying > flush for checkpoint -- This message was sent by Atlassian Jira (v8.20.10#820010)