[ https://issues.apache.org/jira/browse/FLINK-20630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai reassigned FLINK-20630: ------------------------------------------- Assignee: Danny Cranmer > [Kinesis][DynamoDB] DynamoDB Streams Consumer fails to consume from Latest > -------------------------------------------------------------------------- > > Key: FLINK-20630 > URL: https://issues.apache.org/jira/browse/FLINK-20630 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis > Affects Versions: 1.12.0 > Reporter: Danny Cranmer > Assignee: Danny Cranmer > Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.1 > > > *Background* > When consuming from {{LATEST}}, the {{KinesisDataFetcher}} converts the shard > iterator type into an {{AT_TIMESTAMP}} to ensure all shards start from the > same position. When {{LATEST}} is used each shared would effectively start > from a different point in the time. > DynamoDB streams do not support {{AT_TIMESTAMP}} iterator type. > *Scope* > Remove shard iterator type transform for DynamoDB streams consumer. > *Reproduction Steps* > Create a simple application that consumer from {{LATEST}} using > {{FlinkDynamoDBStreamsConsumer}} > *Expected Results* > Consumer starts reading records from the head of the stream > *Actual Results* > An exception is thrown: > {code} > Caused by: > org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: > 1 validation error detected: Value 'AT_TIMESTAMP' at 'shardIteratorType' > failed to satisfy constraint: Member must satisfy enum value set: > [AFTER_SEQUENCE_NUMBER, LATEST, AT_SEQUENCE_NUMBER, TRIM_HORIZON] (Service: > AmazonDynamoDBStreams; Status Code: 400; Error Code: ValidationException; > Request ID: AFQ8KCJAP74IN5MR5KD2FP0CTBVV4KQNSO5AEMVJF66Q9ASUAAJG) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544) > at > org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.doInvoke(AmazonDynamoDBStreamsClient.java:686) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:653) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:642) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.executeGetShardIterator(AmazonDynamoDBStreamsClient.java:544) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.getShardIterator(AmazonDynamoDBStreamsClient.java:515) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient.getShardIterator(AmazonDynamoDBStreamsAdapterClient.java:355) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:311) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:302) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.getShardIterator(PollingRecordPublisher.java:173) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.<init>(PollingRecordPublisher.java:93) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:85) > at > org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:36) > at > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:469) > at > org.apache.flink.streaming.connectors.kinesis.internals.DynamoDBStreamsDataFetcher.createShardConsumer(DynamoDBStreamsDataFetcher.java:107) > at > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:540) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:348) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)