[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711587#comment-15711587 ]
ASF GitHub Bot commented on FLINK-4523: --------------------------------------- Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90420204 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java --- @@ -145,6 +145,39 @@ public void testUnrecognizableStreamInitPositionTypeInConfig() { } @Test + public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Please set value for initial timestamp ('" + + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); + + Properties testConfig = new Properties(); + testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); + testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); + testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); + + KinesisConfigUtil.validateConsumerConfiguration(testConfig); + } + + @Test + public void testUnparsableDateForInitialTimestampInConfig() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. " --- End diff -- ok > Allow Kinesis Consumer to start from specific timestamp / Date > -------------------------------------------------------------- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)