Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187788363
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
    @@ -70,20 +113,107 @@ public void 
testIsRecoverableExceptionWithNullErrorType() {
        }
     
        @Test
    -   public void testCustomConfigurationOverride() {
    -           Properties configProps = new Properties();
    -           configProps.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
    -           KinesisProxy proxy = new KinesisProxy(configProps) {
    -                   @Override
    -                   protected AmazonKinesis createKinesisClient(Properties 
configProps) {
    -                           ClientConfiguration clientConfig = new 
ClientConfigurationFactory().getConfig();
    -                           clientConfig.setSocketTimeout(10000);
    -                           return AWSUtil.createKinesisClient(configProps, 
clientConfig);
    +   public void testGetShardList() throws Exception {
    +           List<String> shardIds =
    +                           Arrays.asList(
    +                                           "shardId-000000000000",
    +                                           "shardId-000000000001",
    +                                           "shardId-000000000002",
    +                                           "shardId-000000000003");
    +           shardIdSet = new HashSet<>(shardIds);
    +           shards =
    +                           shardIds
    +                                           .stream()
    +                                           .map(shardId -> new 
Shard().withShardId(shardId))
    +                                           .collect(Collectors.toList());
    +           Properties kinesisConsumerConfig = new Properties();
    +           
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
    +           
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"fake_accesskey");
    +           kinesisConsumerConfig.setProperty(
    +                           ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"fake_secretkey");
    +           KinesisProxy kinesisProxy = new 
KinesisProxy(kinesisConsumerConfig);
    +           AmazonKinesis mockClient = mock(AmazonKinesis.class);
    +           Whitebox.setInternalState(kinesisProxy, "kinesisClient", 
mockClient);
    +
    +           ListShardsResult responseWithMoreData =
    +                           new 
ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
    +           ListShardsResult responseFinal =
    +                           new 
ListShardsResult().withShards(shards.subList(2, 
shards.size())).withNextToken(null);
    +           doReturn(responseWithMoreData)
    +                           .when(mockClient)
    +                           
.listShards(argThat(initialListShardsRequestMatcher()));
    +           
doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN)));
    +           HashMap<String, String> streamHashMap =
    +                           
createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
    +           GetShardListResult shardListResult = 
kinesisProxy.getShardList(streamHashMap);
    +
    +           Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
    +
    +           Set<String> expectedStreams = new HashSet<>();
    +           expectedStreams.add(fakeStreamName);
    +           
Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), 
expectedStreams);
    +           List<StreamShardHandle> actualShardList =
    +                           
shardListResult.getRetrievedShardListOfStream(fakeStreamName);
    +           List<StreamShardHandle> expectedStreamShard = new ArrayList<>();
    +           System.out.println(actualShardList.toString());
    +           assertThat(actualShardList, hasSize(4));
    +           for (int i = 0; i < 4; i++) {
    +                   StreamShardHandle shardHandle =
    +                                   new StreamShardHandle(
    +                                                   fakeStreamName,
    +                                                   new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
    +                   expectedStreamShard.add(shardHandle);
    +           }
    +
    +           Assert.assertThat(
    +                           actualShardList,
    +                           containsInAnyOrder(
    +                                           expectedStreamShard.toArray(new 
StreamShardHandle[actualShardList.size()])));
    +   }
    +
    +   private static class ListShardsRequestMatcher extends 
TypeSafeDiagnosingMatcher<ListShardsRequest> {
    +           private final String shardId;
    +           private final String nextToken;
    +
    +           ListShardsRequestMatcher(String shardIdArg, String 
nextTokenArg) {
    +                   shardId = shardIdArg;
    +                   nextToken = nextTokenArg;
    +           }
    +
    +           @Override
    +           protected boolean matchesSafely(final ListShardsRequest 
listShardsRequest, final Description description) {
    +                   if (shardId == null) {
    +                           if 
(StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) {
    --- End diff --
    
    Can we avoid using `StringUtils` and just use `!String.isEmpty()` instead?


---

Reply via email to