[ 
https://issues.apache.org/jira/browse/FLINK-10020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583372#comment-16583372
 ] 

ASF GitHub Bot commented on FLINK-10020:
----------------------------------------

asfgit closed pull request #6482: [FLINK-10020] [kinesis] Support recoverable 
exceptions in listShards.
URL: https://github.com/apache/flink/pull/6482
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 48a0b3c9559..443b19ec382 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -92,6 +92,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
        /** The power constant for exponential backoff between each 
describeStream attempt. */
        public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT 
= "flink.stream.describe.backoff.expconst";
 
+       /** The maximum number of listShards attempts if we get a recoverable 
exception. */
+       public static final String LIST_SHARDS_RETRIES = 
"flink.list.shards.maxretries";
+
        /** The base backoff time between each listShards attempt. */
        public static final String LIST_SHARDS_BACKOFF_BASE = 
"flink.list.shards.backoff.base";
 
@@ -104,7 +107,7 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
        /** The maximum number of records to try to get each time we fetch 
records from a AWS Kinesis shard. */
        public static final String SHARD_GETRECORDS_MAX = 
"flink.shard.getrecords.maxrecordcount";
 
-       /** The maximum number of getRecords attempts if we get 
ProvisionedThroughputExceededException. */
+       /** The maximum number of getRecords attempts if we get a recoverable 
exception. */
        public static final String SHARD_GETRECORDS_RETRIES = 
"flink.shard.getrecords.maxretries";
 
        /** The base backoff time between getRecords attempts if we get a 
ProvisionedThroughputExceededException. */
@@ -161,6 +164,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
 
        public static final double 
DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
+       public static final int DEFAULT_LIST_SHARDS_RETRIES = 10;
+
        public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;
 
        public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 7e6a3604414..262181ae3bc 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -91,6 +91,9 @@
        /** Exponential backoff power constant for the list shards operation. */
        private final double listShardsExpConstant;
 
+       /** Maximum retry attempts for the list shards operation. */
+       private final int listShardsMaxRetries;
+
        // 
------------------------------------------------------------------------
        //  getRecords() related performance settings
        // 
------------------------------------------------------------------------
@@ -104,8 +107,8 @@
        /** Exponential backoff power constant for the get records operation. */
        private final double getRecordsExpConstant;
 
-       /** Maximum attempts for the get records operation. */
-       private final int getRecordsMaxAttempts;
+       /** Maximum retry attempts for the get records operation. */
+       private final int getRecordsMaxRetries;
 
        // 
------------------------------------------------------------------------
        //  getShardIterator() related performance settings
@@ -120,8 +123,8 @@
        /** Exponential backoff power constant for the get shard iterator 
operation. */
        private final double getShardIteratorExpConstant;
 
-       /** Maximum attempts for the get shard iterator operation. */
-       private final int getShardIteratorMaxAttempts;
+       /** Maximum retry attempts for the get shard iterator operation. */
+       private final int getShardIteratorMaxRetries;
 
        /**
         * Create a new KinesisProxy based on the supplied configuration 
properties.
@@ -146,6 +149,10 @@ protected KinesisProxy(Properties configProps) {
                        configProps.getProperty(
                                
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
                                
Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
+               this.listShardsMaxRetries = Integer.valueOf(
+                       configProps.getProperty(
+                               ConsumerConfigConstants.LIST_SHARDS_RETRIES,
+                               
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));
 
                this.getRecordsBaseBackoffMillis = Long.valueOf(
                        configProps.getProperty(
@@ -159,7 +166,7 @@ protected KinesisProxy(Properties configProps) {
                        configProps.getProperty(
                                
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
                                
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
-               this.getRecordsMaxAttempts = Integer.valueOf(
+               this.getRecordsMaxRetries = Integer.valueOf(
                        configProps.getProperty(
                                
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
                                
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
@@ -176,7 +183,7 @@ protected KinesisProxy(Properties configProps) {
                        configProps.getProperty(
                                
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
                                
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
-               this.getShardIteratorMaxAttempts = Integer.valueOf(
+               this.getShardIteratorMaxRetries = Integer.valueOf(
                        configProps.getProperty(
                                
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
                                
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
@@ -217,14 +224,14 @@ public GetRecordsResult getRecords(String shardIterator, 
int maxRecordsToGet) th
 
                GetRecordsResult getRecordsResult = null;
 
-               int attempt = 0;
-               while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
+               int retryCount = 0;
+               while (retryCount <= getRecordsMaxRetries && getRecordsResult 
== null) {
                        try {
                                getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
                        } catch (SdkClientException ex) {
                                if (isRecoverableSdkClientException(ex)) {
                                        long backoffMillis = fullJitterBackoff(
-                                               getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+                                               getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, retryCount++);
                                        LOG.warn("Got recoverable 
SdkClientException. Backing off for "
                                                + backoffMillis + " millis (" + 
ex.getMessage() + ")");
                                        Thread.sleep(backoffMillis);
@@ -235,7 +242,7 @@ public GetRecordsResult getRecords(String shardIterator, 
int maxRecordsToGet) th
                }
 
                if (getRecordsResult == null) {
-                       throw new RuntimeException("Rate Exceeded for 
getRecords operation - all " + getRecordsMaxAttempts +
+                       throw new RuntimeException("Rate Exceeded for 
getRecords operation - all " + getRecordsMaxRetries +
                                " retry attempts returned 
ProvisionedThroughputExceededException.");
                }
 
@@ -292,14 +299,14 @@ public String getShardIterator(StreamShardHandle shard, 
String shardIteratorType
        private String getShardIterator(GetShardIteratorRequest 
getShardIteratorRequest) throws InterruptedException {
                GetShardIteratorResult getShardIteratorResult = null;
 
-               int attempt = 0;
-               while (attempt <= getShardIteratorMaxAttempts && 
getShardIteratorResult == null) {
+               int retryCount = 0;
+               while (retryCount <= getShardIteratorMaxRetries && 
getShardIteratorResult == null) {
                        try {
                                        getShardIteratorResult = 
kinesisClient.getShardIterator(getShardIteratorRequest);
                        } catch (AmazonServiceException ex) {
                                if (isRecoverableException(ex)) {
                                        long backoffMillis = fullJitterBackoff(
-                                               
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, 
getShardIteratorExpConstant, attempt++);
+                                               
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, 
getShardIteratorExpConstant, retryCount++);
                                        LOG.warn("Got recoverable 
AmazonServiceException. Backing off for "
                                                + backoffMillis + " millis (" + 
ex.getErrorMessage() + ")");
                                        Thread.sleep(backoffMillis);
@@ -310,7 +317,7 @@ private String getShardIterator(GetShardIteratorRequest 
getShardIteratorRequest)
                }
 
                if (getShardIteratorResult == null) {
-                       throw new RuntimeException("Rate Exceeded for 
getShardIterator operation - all " + getShardIteratorMaxAttempts +
+                       throw new RuntimeException("Rate Exceeded for 
getShardIterator operation - all " + getShardIteratorMaxRetries +
                                " retry attempts returned 
ProvisionedThroughputExceededException.");
                }
                return getShardIteratorResult.getShardIterator();
@@ -406,16 +413,16 @@ private ListShardsResult listShards(String streamName, 
@Nullable String startSha
                ListShardsResult listShardsResults = null;
 
                // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
-               int attemptCount = 0;
+               int retryCount = 0;
                // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
                // are taken up.
-               while (listShardsResults == null) { // retry until we get a 
result
+               while (retryCount <= listShardsMaxRetries && listShardsResults 
== null) { // retry until we get a result
                        try {
 
                                listShardsResults = 
kinesisClient.listShards(listShardsRequest);
                        } catch (LimitExceededException le) {
                                long backoffMillis = fullJitterBackoff(
-                                               listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+                                               listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
                                        LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
                                                                        + ". 
Backing off for " + backoffMillis + " millis.");
                                Thread.sleep(backoffMillis);
@@ -433,6 +440,18 @@ private ListShardsResult listShards(String streamName, 
@Nullable String startSha
                        } catch (ExpiredNextTokenException expiredToken) {
                                LOG.warn("List Shards has an expired token. 
Reusing the previous state.");
                                break;
+                       } catch (SdkClientException ex) {
+                               if (retryCount < listShardsMaxRetries && 
isRecoverableSdkClientException(ex)) {
+                                       long backoffMillis = fullJitterBackoff(
+                                               listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
+                                       LOG.warn("Got SdkClientException when 
listing shards from stream {}. Backing off for {} millis.",
+                                               streamName, backoffMillis);
+                                       Thread.sleep(backoffMillis);
+                               } else {
+                                       // propagate if retries exceeded or not 
recoverable
+                                       // (otherwise would return null result 
and keep trying forever)
+                                       throw ex;
+                               }
                        }
                }
                // Kinesalite (mock implementation of Kinesis) does not 
correctly exclude shards before
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
index 775ae4b3352..edf6ceb0d57 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
@@ -27,16 +27,24 @@
 import com.amazonaws.AmazonServiceException.ErrorType;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.SdkClientException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.AmazonKinesisException;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.ListShardsRequest;
 import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.reflect.Whitebox;
 
 import java.util.ArrayList;
@@ -54,6 +62,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -91,6 +100,37 @@ public void testIsRecoverableExceptionWithNullErrorType() {
                assertFalse(KinesisProxy.isRecoverableException(ex));
        }
 
+       @Test
+       public void testGetRecordsRetry() throws Exception {
+               Properties kinesisConsumerConfig = new Properties();
+               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+
+               final GetRecordsResult expectedResult = new GetRecordsResult();
+               MutableInt retries = new MutableInt();
+               final Throwable[] retriableExceptions = new Throwable[] {
+                       new AmazonKinesisException("mock"),
+               };
+
+               AmazonKinesisClient mockClient = 
mock(AmazonKinesisClient.class);
+               Mockito.when(mockClient.getRecords(any())).thenAnswer(new 
Answer<GetRecordsResult>() {
+                       @Override
+                       public GetRecordsResult answer(InvocationOnMock 
invocation) throws Throwable{
+                               if (retries.intValue() < 
retriableExceptions.length) {
+                                       retries.increment();
+                                       throw 
retriableExceptions[retries.intValue() - 1];
+                               }
+                               return expectedResult;
+                       }
+               });
+
+               KinesisProxy kinesisProxy = new 
KinesisProxy(kinesisConsumerConfig);
+               Whitebox.getField(KinesisProxy.class, 
"kinesisClient").set(kinesisProxy, mockClient);
+
+               GetRecordsResult result = 
kinesisProxy.getRecords("fakeShardIterator", 1);
+               assertEquals(retriableExceptions.length, retries.intValue());
+               assertEquals(expectedResult, result);
+       }
+
        @Test
        public void testGetShardList() throws Exception {
                List<String> shardIds =
@@ -151,6 +191,60 @@ public void testGetShardList() throws Exception {
                                                expectedStreamShard.toArray(new 
StreamShardHandle[actualShardList.size()])));
        }
 
+       @Test
+       public void testGetShardListRetry() throws Exception {
+               Properties kinesisConsumerConfig = new Properties();
+               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+
+               Shard shard = new Shard();
+               shard.setShardId("fake-shard-000000000000");
+               final ListShardsResult expectedResult = new ListShardsResult();
+               expectedResult.withShards(shard);
+
+               MutableInt exceptionCount = new MutableInt();
+               final Throwable[] retriableExceptions = new Throwable[]{
+                       new AmazonKinesisException("attempt1"),
+                       new AmazonKinesisException("attempt2"),
+               };
+
+               AmazonKinesisClient mockClient = 
mock(AmazonKinesisClient.class);
+               Mockito.when(mockClient.listShards(any())).thenAnswer(new 
Answer<ListShardsResult>() {
+
+                       @Override
+                       public ListShardsResult answer(InvocationOnMock 
invocation) throws Throwable {
+                               if (exceptionCount.intValue() < 
retriableExceptions.length) {
+                                       exceptionCount.increment();
+                                       throw 
retriableExceptions[exceptionCount.intValue() - 1];
+                               }
+                               return expectedResult;
+                       }
+               });
+
+               KinesisProxy kinesisProxy = new 
KinesisProxy(kinesisConsumerConfig);
+               Whitebox.getField(KinesisProxy.class, 
"kinesisClient").set(kinesisProxy, mockClient);
+
+               HashMap<String, String> streamNames = new HashMap();
+               streamNames.put("fake-stream", null);
+               GetShardListResult result = 
kinesisProxy.getShardList(streamNames);
+               assertEquals(retriableExceptions.length, 
exceptionCount.intValue());
+               assertEquals(true, result.hasRetrievedShards());
+               assertEquals(shard.getShardId(), 
result.getLastSeenShardOfStream("fake-stream").getShard().getShardId());
+
+               // test max attempt count exceeded
+               int maxRetries = 1;
+               exceptionCount.setValue(0);
+               
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_RETRIES, 
String.valueOf(maxRetries));
+               kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
+               Whitebox.getField(KinesisProxy.class, 
"kinesisClient").set(kinesisProxy, mockClient);
+               try {
+                       kinesisProxy.getShardList(streamNames);
+                       Assert.fail("exception expected");
+               } catch (SdkClientException ex) {
+                       assertEquals(retriableExceptions[maxRetries], ex);
+               }
+               assertEquals(maxRetries + 1, exceptionCount.intValue());
+       }
+
        @Test
        public void testCustomConfigurationOverride() {
                Properties configProps = new Properties();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kinesis Consumer listShards should support more recoverable exceptions
> ----------------------------------------------------------------------
>
>                 Key: FLINK-10020
>                 URL: https://issues.apache.org/jira/browse/FLINK-10020
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>            Reporter: Thomas Weise
>            Assignee: Thomas Weise
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.1, 1.7.0
>
>
> Currently transient errors in listShards make the consumer fail and cause the 
> entire job to reset. That is unnecessary for certain exceptions (like status 
> 503 errors). It should be possible to control the exceptions that qualify for 
> retry, similar to getRecords/isRecoverableSdkClientException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to