Hello!

(Apache Flink1.8 on AWS EMR release label 5.28.x)

Our data source is an AWS Kinesis stream (with 450 shards if that matters). We 
use the FlinkKinesisConsumer to read the kinesis stream. Our application 
occasionally (once every couple of days) crashes with a "Target server failed 
to respond" error. The full stack trace is at the bottom.

Looking more into the codebase I found out that 
'ProvisionedThroughputExceededException' are the only exception types that are 
retried on. 
Code<https://github.com/apache/flink/blob/2b0a8ceeb131c938d2e41dfee66099bfa5f366ae/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L365>
1. Wondering why a transient http response exception is not retried by the 
kinesis connector?
2. Is there a way I can pass in a retry configuration that will retry on these 
errors?

As a side note, we set the following retry configuration -

env.setRestartStrategy(RestartStrategies.failureRateRestart(12,

      org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),

                org.apache.flink.api.common.time.Time.of(300, 
TimeUnit.SECONDS)));

Full stack trace of the exception -

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)

    at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)

    at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)

    at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)

    at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

(Shamelessly copy pasting from the stack overflow question I posted 
https://stackoverflow.com/questions/62399248/flinkkinesisconsumer-does-not-retry-on-nohttpresponseexception
 )

--
KP

Reply via email to