Suxing Lee created FLINK-35396:
----------------------------------

             Summary: DynamoDB Streams Consumer consumption data is out of order
                 Key: FLINK-35396
                 URL: https://issues.apache.org/jira/browse/FLINK-35396
             Project: Flink
          Issue Type: Bug
          Components: Connectors / AWS, Connectors / Kinesis
    Affects Versions: 1.16.2
            Reporter: Suxing Lee


When we use `FlinkDynamoDBStreamsConsumer` in 
`flink-connector-aws/flink-connector-kinesis` to consume dynamodb stream data, 
there is an out-of-order problem.
The service exception log is as follows:
{noformat}
2024-05-06 00:00:40,639 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 has discovered a new shard 
StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***', 
shard='{ShardId: shardId-00000001714924828427-d73b6b68,
  ParentShardId: shardId-00000001714910797443-fb1d3b22,HashKeyRange: 
{StartingHashKey: 0,EndingHashKey: 1},SequenceNumberRange: 
{StartingSequenceNumber: 2958376400000000058201168012,}}'} due to resharding, 
and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM 
with ShardConsumer 2807
......
......
......
2024-05-06 00:00:46,729 INFO  
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - 
Subtask 0 has reached the end of subscribed shard: 
StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***', 
shard='{ShardId: shardId-00000001714910797443-fb1d3b22,ParentShardId: 
shardId-00000001714897099372-17932b9a,HashKeyRange: {StartingHashKey: 
0,EndingHashKey: 1},SequenceNumberRange: {StartingSequenceNumber: 
2955440900000000051102788386,}}'}
{noformat}

It looks like the failure process is:
`2024-05-06 00:00:40,639` A new shard is discovered and new sub-shards are 
consumed immediately.(ShardId: shardId-00000001714924828427-d73b6b68).
`2024-05-06 00:00:46,729` Consume the old parent shard:(ShardId: 
shardId-00000001714910797443-fb1d3b22)end.

There was a gap of 6 seconds. In other words, before the data consumption of 
the parent shard has finished, the child shard has already started consuming 
data. This causes the data we read to be sent downstream out of order.

https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L689-L740
 

This is because the code immediately submits `ShardConsumer` to 
`shardConsumersExecutor` when `discoverNewShards` is created, and 
`shardConsumersExecutor` is created through Executors.newCachedThreadPool(), 
which does not limit the number of threads, causing new and old shards to be 
consumed at the same time , so data consumption is out of order?
`flink-connector-kinesis` relies on `dynamodb-streams-kinesis-adapter` to 
subscribe to messages from dynamodb stream. But why does 
`dynamodb-streams-kinesis-adapter` directly consume data without similar 
problems?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to