Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788363 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -70,20 +113,107 @@ public void testIsRecoverableExceptionWithNullErrorType() { } @Test - public void testCustomConfigurationOverride() { - Properties configProps = new Properties(); - configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - KinesisProxy proxy = new KinesisProxy(configProps) { - @Override - protected AmazonKinesis createKinesisClient(Properties configProps) { - ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig(); - clientConfig.setSocketTimeout(10000); - return AWSUtil.createKinesisClient(configProps, clientConfig); + public void testGetShardList() throws Exception { + List<String> shardIds = + Arrays.asList( + "shardId-000000000000", + "shardId-000000000001", + "shardId-000000000002", + "shardId-000000000003"); + shardIdSet = new HashSet<>(shardIds); + shards = + shardIds + .stream() + .map(shardId -> new Shard().withShardId(shardId)) + .collect(Collectors.toList()); + Properties kinesisConsumerConfig = new Properties(); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey"); + kinesisConsumerConfig.setProperty( + ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey"); + KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig); + AmazonKinesis mockClient = mock(AmazonKinesis.class); + Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient); + + ListShardsResult responseWithMoreData = + new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN); + ListShardsResult responseFinal = + new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null); + doReturn(responseWithMoreData) + .when(mockClient) + .listShards(argThat(initialListShardsRequestMatcher())); + doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN))); + HashMap<String, String> streamHashMap = + createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName)); + GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap); + + Assert.assertEquals(shardListResult.hasRetrievedShards(), true); + + Set<String> expectedStreams = new HashSet<>(); + expectedStreams.add(fakeStreamName); + Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams); + List<StreamShardHandle> actualShardList = + shardListResult.getRetrievedShardListOfStream(fakeStreamName); + List<StreamShardHandle> expectedStreamShard = new ArrayList<>(); + System.out.println(actualShardList.toString()); + assertThat(actualShardList, hasSize(4)); + for (int i = 0; i < 4; i++) { + StreamShardHandle shardHandle = + new StreamShardHandle( + fakeStreamName, + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))); + expectedStreamShard.add(shardHandle); + } + + Assert.assertThat( + actualShardList, + containsInAnyOrder( + expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()]))); + } + + private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher<ListShardsRequest> { + private final String shardId; + private final String nextToken; + + ListShardsRequestMatcher(String shardIdArg, String nextTokenArg) { + shardId = shardIdArg; + nextToken = nextTokenArg; + } + + @Override + protected boolean matchesSafely(final ListShardsRequest listShardsRequest, final Description description) { + if (shardId == null) { + if (StringUtils.isNotEmpty(listShardsRequest.getExclusiveStartShardId())) { --- End diff -- Can we avoid using `StringUtils` and just use `!String.isEmpty()` instead?
---