[ 
https://issues.apache.org/jira/browse/FLINK-20630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-20630:
-------------------------------------------

    Assignee: Danny Cranmer

> [Kinesis][DynamoDB] DynamoDB Streams Consumer fails to consume from Latest
> --------------------------------------------------------------------------
>
>                 Key: FLINK-20630
>                 URL: https://issues.apache.org/jira/browse/FLINK-20630
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.12.0
>            Reporter: Danny Cranmer
>            Assignee: Danny Cranmer
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.12.1
>
>
> *Background*
> When consuming from {{LATEST}}, the {{KinesisDataFetcher}} converts the shard 
> iterator type into an {{AT_TIMESTAMP}} to ensure all shards start from the 
> same position. When {{LATEST}} is used each shared would effectively start 
> from a different point in the time.
> DynamoDB streams do not support {{AT_TIMESTAMP}} iterator type.
> *Scope*
> Remove shard iterator type transform for DynamoDB streams consumer. 
> *Reproduction Steps*
> Create a simple application that consumer from {{LATEST}} using 
> {{FlinkDynamoDBStreamsConsumer}}
> *Expected Results*
> Consumer starts reading records from the head of the stream
> *Actual Results*
> An exception is thrown:
> {code}
> Caused by: 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException:
>  1 validation error detected: Value 'AT_TIMESTAMP' at 'shardIteratorType' 
> failed to satisfy constraint: Member must satisfy enum value set: 
> [AFTER_SEQUENCE_NUMBER, LATEST, AT_SEQUENCE_NUMBER, TRIM_HORIZON] (Service: 
> AmazonDynamoDBStreams; Status Code: 400; Error Code: ValidationException; 
> Request ID: AFQ8KCJAP74IN5MR5KD2FP0CTBVV4KQNSO5AEMVJF66Q9ASUAAJG)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.doInvoke(AmazonDynamoDBStreamsClient.java:686)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:653)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.invoke(AmazonDynamoDBStreamsClient.java:642)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.executeGetShardIterator(AmazonDynamoDBStreamsClient.java:544)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient.getShardIterator(AmazonDynamoDBStreamsClient.java:515)
>       at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient.getShardIterator(AmazonDynamoDBStreamsAdapterClient.java:355)
>       at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:311)
>       at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardIterator(KinesisProxy.java:302)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.getShardIterator(PollingRecordPublisher.java:173)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.<init>(PollingRecordPublisher.java:93)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:85)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisherFactory.create(PollingRecordPublisherFactory.java:36)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:469)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.DynamoDBStreamsDataFetcher.createShardConsumer(DynamoDBStreamsDataFetcher.java:107)
>       at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:540)
>       at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:348)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to