tzulitai commented on a change in pull request #12850:
URL: https://github.com/apache/flink/pull/12850#discussion_r458507237



##########
File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
##########
@@ -51,148 +61,123 @@
 
        @Test
        public void testMetricsReporting() {
-               StreamShardHandle fakeToBeConsumedShard = 
getMockStreamShard("fakeStream", 0);
-
-               LinkedList<KinesisStreamShardState> 
subscribedShardsStateUnderTest = new LinkedList<>();
-               subscribedShardsStateUnderTest.add(
-                       new KinesisStreamShardState(
-                               
KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
-                               fakeToBeConsumedShard,
-                               new SequenceNumber("fakeStartingState")));
+               KinesisProxyInterface kinesis = 
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(500, 5, 
500);
 
-               TestSourceContext<String> sourceContext = new 
TestSourceContext<>();
-
-               KinesisDeserializationSchemaWrapper<String> 
deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
-                       new SimpleStringSchema());
-               TestableKinesisDataFetcher<String> fetcher =
-                       new TestableKinesisDataFetcher<>(
-                               Collections.singletonList("fakeStream"),
-                               sourceContext,
-                               new Properties(),
-                               deserializationSchema,
-                               10,
-                               2,
-                               new AtomicReference<>(),
-                               subscribedShardsStateUnderTest,
-                               
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
-                               Mockito.mock(KinesisProxyInterface.class));
+               ShardMetricsReporter metrics = 
assertNumberOfMessagesReceivedFromKinesis(500, kinesis, fakeSequenceNumber());
+               assertEquals(500, metrics.getMillisBehindLatest());
+               assertEquals(10000, metrics.getMaxNumberOfRecordsPerFetch());
+       }
 
-               ShardMetricsReporter shardMetricsReporter = new 
ShardMetricsReporter();
-               long millisBehindLatest = 500L;
-               new ShardConsumer<>(
-                       fetcher,
-                       0,
-                       
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
-                       
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
-                       
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 
9, millisBehindLatest),
-                       shardMetricsReporter,
-                       deserializationSchema)
-                       .run();
+       @Test
+       public void 
testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceNumber() 
throws Exception {
+               KinesisProxyInterface kinesis = 
spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000,
 9, 500L));
 
-               // the millisBehindLatest metric should have been reported
-               assertEquals(millisBehindLatest, 
shardMetricsReporter.getMillisBehindLatest());
+               assertNumberOfMessagesReceivedFromKinesis(1000, kinesis, 
fakeSequenceNumber());
+               verify(kinesis).getShardIterator(any(), 
eq("AFTER_SEQUENCE_NUMBER"), eq("fakeStartingState"));
        }
 
        @Test
-       public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
-               StreamShardHandle fakeToBeConsumedShard = 
getMockStreamShard("fakeStream", 0);
+       public void 
testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceSentinelTimestamp()
 throws Exception {
+               String format = "yyyy-MM-dd'T'HH:mm";
+               String timestamp = "2020-07-02T09:14";
+               Date expectedTimestamp = new 
SimpleDateFormat(format).parse(timestamp);
 
-               LinkedList<KinesisStreamShardState> 
subscribedShardsStateUnderTest = new LinkedList<>();
-               subscribedShardsStateUnderTest.add(
-                       new 
KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
-                               fakeToBeConsumedShard, new 
SequenceNumber("fakeStartingState")));
+               Properties consumerProperties = new Properties();
+               consumerProperties.setProperty(STREAM_INITIAL_TIMESTAMP, 
timestamp);
+               consumerProperties.setProperty(STREAM_TIMESTAMP_DATE_FORMAT, 
format);
+               SequenceNumber sequenceNumber = 
SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get();
 
-               TestSourceContext<String> sourceContext = new 
TestSourceContext<>();
+               KinesisProxyInterface kinesis = 
spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(10, 
1, 0));
 
-               KinesisDeserializationSchemaWrapper<String> 
deserializationSchema = new KinesisDeserializationSchemaWrapper<>(
-                       new SimpleStringSchema());
-               TestableKinesisDataFetcher<String> fetcher =
-                       new TestableKinesisDataFetcher<>(
-                               Collections.singletonList("fakeStream"),
-                               sourceContext,
-                               new Properties(),
-                               deserializationSchema,
-                               10,
-                               2,
-                               new AtomicReference<>(),
-                               subscribedShardsStateUnderTest,
-                               
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
-                               Mockito.mock(KinesisProxyInterface.class));
+               assertNumberOfMessagesReceivedFromKinesis(10, kinesis, 
sequenceNumber, consumerProperties);
+               verify(kinesis).getShardIterator(any(), eq("AT_TIMESTAMP"), 
eq(expectedTimestamp));
+       }
 
-               int shardIndex = 
fetcher.registerNewSubscribedShardState(subscribedShardsStateUnderTest.get(0));
-               new ShardConsumer<>(
-                       fetcher,
-                       shardIndex,
-                       
subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
-                       
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
-                       
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 
9, 500L),
-                       new ShardMetricsReporter(),
-                       deserializationSchema)
-                       .run();
+       @Test
+       public void 
testCorrectNumOfCollectedRecordsAndUpdatedStateWithStartingSequenceSentinelEarliest()
 throws Exception {
+               SequenceNumber sequenceNumber = 
SENTINEL_EARLIEST_SEQUENCE_NUM.get();
 
-               assertEquals(1000, sourceContext.getCollectedOutputs().size());
-               assertEquals(
-                       
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get(),
-                       
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum());
+               KinesisProxyInterface kinesis = 
spy(FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(50, 
2, 0));
+
+               assertNumberOfMessagesReceivedFromKinesis(50, kinesis, 
sequenceNumber);
+               verify(kinesis).getShardIterator(any(), eq("TRIM_HORIZON"), 
eq(null));
        }
 
        @Test
        public void 
testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
-               StreamShardHandle fakeToBeConsumedShard = 
getMockStreamShard("fakeStream", 0);
+               KinesisProxyInterface kinesis = 
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000,
 9, 7, 500L);
 
-               LinkedList<KinesisStreamShardState> 
subscribedShardsStateUnderTest = new LinkedList<>();
-               subscribedShardsStateUnderTest.add(
-                       new 
KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(fakeToBeConsumedShard),
-                               fakeToBeConsumedShard, new 
SequenceNumber("fakeStartingState")));
+               // Get a total of 1000 records with 9 getRecords() calls,
+               // and the 7th getRecords() call will encounter an unexpected 
expired shard iterator
+               assertNumberOfMessagesReceivedFromKinesis(1000, kinesis, 
fakeSequenceNumber());
+       }
 
-               TestSourceContext<String> sourceContext = new 
TestSourceContext<>();
+       @Test
+       public void 
testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads() {
+               Properties consumerProperties = new Properties();
+               consumerProperties.setProperty("flink.shard.adaptivereads", 
"true");

Review comment:
       Is there a constant for this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to