[ https://issues.apache.org/jira/browse/FLINK-35396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Suxing Lee updated FLINK-35396: ------------------------------- Affects Version/s: (was: 1.16.2) > DynamoDB Streams Consumer consumption data is out of order > ---------------------------------------------------------- > > Key: FLINK-35396 > URL: https://issues.apache.org/jira/browse/FLINK-35396 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS, Connectors / Kinesis > Affects Versions: aws-connector-4.2.0 > Reporter: Suxing Lee > Priority: Major > Labels: AWS > > When we use `FlinkDynamoDBStreamsConsumer` in > `flink-connector-aws/flink-connector-kinesis` to consume dynamodb stream > data, there is an out-of-order problem. > The service exception log is as follows: > {noformat} > 2024-05-06 00:00:40,639 INFO > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] > - Subtask 0 has discovered a new shard > StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***', > shard='{ShardId: shardId-00000001714924828427-d73b6b68, > ParentShardId: shardId-00000001714910797443-fb1d3b22,HashKeyRange: > {StartingHashKey: 0,EndingHashKey: 1},SequenceNumberRange: > {StartingSequenceNumber: 2958376400000000058201168012,}}'} due to resharding, > and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM > with ShardConsumer 2807 > ...... > ...... > ...... > 2024-05-06 00:00:46,729 INFO > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] > - Subtask 0 has reached the end of subscribed shard: > StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***', > shard='{ShardId: shardId-00000001714910797443-fb1d3b22,ParentShardId: > shardId-00000001714897099372-17932b9a,HashKeyRange: {StartingHashKey: > 0,EndingHashKey: 1},SequenceNumberRange: {StartingSequenceNumber: > 2955440900000000051102788386,}}'} > {noformat} > It looks like the failure process is: > `2024-05-06 00:00:40,639` A new shard is discovered and new sub-shards are > consumed immediately.(ShardId: shardId-00000001714924828427-d73b6b68). > `2024-05-06 00:00:46,729` Consume the old parent shard:(ShardId: > shardId-00000001714910797443-fb1d3b22)end. > There was a gap of 6 seconds. In other words, before the data consumption of > the parent shard has finished, the child shard has already started consuming > data. This causes the data we read to be sent downstream out of order. > https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L689-L740 > > This is because the code immediately submits `ShardConsumer` to > `shardConsumersExecutor` when `discoverNewShards` is created, and > `shardConsumersExecutor` is created through Executors.newCachedThreadPool(), > which does not limit the number of threads, causing new and old shards to be > consumed at the same time , so data consumption is out of order? > `flink-connector-kinesis` relies on `dynamodb-streams-kinesis-adapter` to > subscribe to messages from dynamodb stream. But why does > `dynamodb-streams-kinesis-adapter` directly consume data without similar > problems? -- This message was sent by Atlassian Jira (v8.20.10#820010)