[ 
https://issues.apache.org/jira/browse/FLINK-37416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhi Gupta updated FLINK-37416:
-------------------------------
    Description: 
We found that for a customer when there are multiple restarts, there are state 
inconsistency issues between the job manager and the task manager. There is a 
shard which is in ASSIGNED state in the JobManager, but that shard is not even 
present in the TaskManager state. Here is a rough timeline of the two events:
h2. Instance-1

There are 4 task managers as seen from the logs
The data loss started at around {{{}2025-01-07 13:51:30.094{}}}. Job restarted 
from checkpoint 2 at around 13:39

 
 * Task cancellation triggered at: {{2025-01-07 13:38:34.024}}
 * Task got cancelled at: {{2025-01-07 13:38:34.626}}
 * Task restarted at: {{2025-01-07 13:39:26.657}} (Start time of the job from 
scratch after removing the savepoint) no checkpoint used to restore the job
 * Another cancellation triggered at: {{2025-01-07 13:52:21.386}}
 * Cancellation completed at: {{{}2025-01-07 13:52:21.976{}}}. Job restarted 
from checkpoint 2
 * Another cancellation triggered at: {{2025-01-07 15:36:55.472}}
 * This cancellation completed at: {{{}2025-01-07 15:36:57.627{}}}. Job 
restarted from checkpoint 106
 * Another cancellation triggered at: {{2025-01-07 16:43:17.177}}
 * This cancellation completed at: {{{}2025-01-07 16:43:17.661{}}}. Job 
restarted from checkpoint 172
 * Another cancellation: {{2025-01-07 17:49:33.387}}
 * This cancellation completed at: {{2025-01-07 17:49:34.117}}

Checkpoint completion times. These are taken using the following log:

Committing 
buffer-money-movements-docstore-all-protos/95b40845f9d7ccf0dfbe8eeb58830194/chk-172/_metadata
 with MPU ID xxx
|Checkpoint|Time finished|
|2|2025-01-07T13:51:31.500|
|106|2025-01-07T15:36:07.984|
|172|2025-01-07T16:42:25.420|
|238|2025-01-07T17:49:25.897|

Shards which have state consistency issue per operator:
{{d9142698e8c34a850f718000f27c06f1: [shardId-00000001736149119171-7fe92d34]}}
{{Missing splits in 464d94ba07e08db815bc2c93bc566b1a: 
[shardId-00000001736148094601-d38142b5]}}
{{Missing splits in e5540dfffda9a53d8430f30b2d41f1b4: 
[shardId-00000001736150901519-43bc576d]}}

Assignment times of those shards to the readers:
 * 2025-01-07T13:51:30.094 ({{{}shardId-00000001736149119171-7fe92d34{}}})
 * Never assigned ({{{}shardId-00000001736148094601-d38142b5{}}})
 * 2025-01-07T13:51:30.095 ({{{}shardId-00000001736150901519-43bc576d{}}})

 
h2. Instance-2
 * Attempt to cancel some tasks at {{2025-02-14T20:04:35.840667Z}}
 * At {{2025-02-14T20:04:36.166849Z}} the TM-1-8 finally say that the JOB has 
been cancelled and they have sent job cancellation event to the JM, but 
seemingly there is no cancellation event in the JM logs
 * Again, at {{{}2025-02-14T20:05:46.940638Z{}}}, the TM-1-8 try to cancel the 
job
 * At {{{}2025-02-14T20:05:47.151611Z{}}}, TM-1-8 completes cancellation of the 
job and sends cancellation message to JM but again, the JM has never received a 
cancellation event of the job
 * Job restore from savepoint 14: {{2025-02-14T20:14:34.814526406Z}}

We see that at around {{2025-02-14T20:05:46.084798Z}} the child shard stopped 
getting polled, so this might be the time when the state got corrupted

 

  was:
We found that for a customer when there are multiple restarts, there are state 
inconsistency issues between the job manager and the task manager. There is a 
shard which is in ASSIGNED state in the JobManager, but that shard is not even 
present in the TaskManager state. Here is a rough timeline of the two events:

## Instance-1

There are 4 task managers as seen from the logs
The data loss started at around {{{}2025-01-07 13:51:30.094{}}}. Job restarted 
from checkpoint 2 at around 13:39

 
 * Task cancellation triggered at: {{2025-01-07 13:38:34.024}}
 * Task got cancelled at: {{2025-01-07 13:38:34.626}}
 * Task restarted at: {{2025-01-07 13:39:26.657}} (Start time of the job from 
scratch after removing the savepoint) no checkpoint used to restore the job
 * Another cancellation triggered at: {{2025-01-07 13:52:21.386}}
 * Cancellation completed at: {{{}2025-01-07 13:52:21.976{}}}. Job restarted 
from checkpoint 2
 * Another cancellation triggered at: {{2025-01-07 15:36:55.472}}
 * This cancellation completed at: {{{}2025-01-07 15:36:57.627{}}}. Job 
restarted from checkpoint 106
 * Another cancellation triggered at: {{2025-01-07 16:43:17.177}}
 * This cancellation completed at: {{{}2025-01-07 16:43:17.661{}}}. Job 
restarted from checkpoint 172
 * Another cancellation: {{2025-01-07 17:49:33.387}}
 * This cancellation completed at: {{2025-01-07 17:49:34.117}}



Checkpoint completion times. These are taken using the following log:

Committing 
buffer-money-movements-docstore-all-protos/95b40845f9d7ccf0dfbe8eeb58830194/chk-172/_metadata
 with MPU ID xxx
|Checkpoint|Time finished|
|2|2025-01-07T13:51:31.500|
|106|2025-01-07T15:36:07.984|
|172|2025-01-07T16:42:25.420|
|238|2025-01-07T17:49:25.897|

Shards which have state consistency issue per operator:
{{d9142698e8c34a850f718000f27c06f1: [shardId-00000001736149119171-7fe92d34]}}
{{Missing splits in 464d94ba07e08db815bc2c93bc566b1a: 
[shardId-00000001736148094601-d38142b5]}}
{{Missing splits in e5540dfffda9a53d8430f30b2d41f1b4: 
[shardId-00000001736150901519-43bc576d]}}

Assignment times of those shards to the readers:
 * 2025-01-07T13:51:30.094 ({{{}shardId-00000001736149119171-7fe92d34{}}})
 * Never assigned ({{{}shardId-00000001736148094601-d38142b5{}}})
 * 2025-01-07T13:51:30.095 ({{{}shardId-00000001736150901519-43bc576d{}}})

 

## Instance-2
 * Attempt to cancel some tasks at {{2025-02-14T20:04:35.840667Z}}
 * At {{2025-02-14T20:04:36.166849Z}} the TM-1-8 finally say that the JOB has 
been cancelled and they have sent job cancellation event to the JM, but 
seemingly there is no cancellation event in the JM logs
 * Again, at {{{}2025-02-14T20:05:46.940638Z{}}}, the TM-1-8 try to cancel the 
job
 * At {{{}2025-02-14T20:05:47.151611Z{}}}, TM-1-8 completes cancellation of the 
job and sends cancellation message to JM but again, the JM has never received a 
cancellation event of the job
 * Job restore from savepoint 14: {{2025-02-14T20:14:34.814526406Z}}

We see that at around {{2025-02-14T20:05:46.084798Z}} the child shard stopped 
getting polled, so this might be the time when the state got corrupted

 


> DDB Streams connector flink state inconsistency issue
> -----------------------------------------------------
>
>                 Key: FLINK-37416
>                 URL: https://issues.apache.org/jira/browse/FLINK-37416
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / DynamoDB
>         Environment: This is a flink environment created using EKS cluster 
> with Karpenter autoscaler
>            Reporter: Abhi Gupta
>            Priority: Major
>
> We found that for a customer when there are multiple restarts, there are 
> state inconsistency issues between the job manager and the task manager. 
> There is a shard which is in ASSIGNED state in the JobManager, but that shard 
> is not even present in the TaskManager state. Here is a rough timeline of the 
> two events:
> h2. Instance-1
> There are 4 task managers as seen from the logs
> The data loss started at around {{{}2025-01-07 13:51:30.094{}}}. Job 
> restarted from checkpoint 2 at around 13:39
>  
>  * Task cancellation triggered at: {{2025-01-07 13:38:34.024}}
>  * Task got cancelled at: {{2025-01-07 13:38:34.626}}
>  * Task restarted at: {{2025-01-07 13:39:26.657}} (Start time of the job from 
> scratch after removing the savepoint) no checkpoint used to restore the job
>  * Another cancellation triggered at: {{2025-01-07 13:52:21.386}}
>  * Cancellation completed at: {{{}2025-01-07 13:52:21.976{}}}. Job restarted 
> from checkpoint 2
>  * Another cancellation triggered at: {{2025-01-07 15:36:55.472}}
>  * This cancellation completed at: {{{}2025-01-07 15:36:57.627{}}}. Job 
> restarted from checkpoint 106
>  * Another cancellation triggered at: {{2025-01-07 16:43:17.177}}
>  * This cancellation completed at: {{{}2025-01-07 16:43:17.661{}}}. Job 
> restarted from checkpoint 172
>  * Another cancellation: {{2025-01-07 17:49:33.387}}
>  * This cancellation completed at: {{2025-01-07 17:49:34.117}}
> Checkpoint completion times. These are taken using the following log:
> Committing 
> buffer-money-movements-docstore-all-protos/95b40845f9d7ccf0dfbe8eeb58830194/chk-172/_metadata
>  with MPU ID xxx
> |Checkpoint|Time finished|
> |2|2025-01-07T13:51:31.500|
> |106|2025-01-07T15:36:07.984|
> |172|2025-01-07T16:42:25.420|
> |238|2025-01-07T17:49:25.897|
> Shards which have state consistency issue per operator:
> {{d9142698e8c34a850f718000f27c06f1: [shardId-00000001736149119171-7fe92d34]}}
> {{Missing splits in 464d94ba07e08db815bc2c93bc566b1a: 
> [shardId-00000001736148094601-d38142b5]}}
> {{Missing splits in e5540dfffda9a53d8430f30b2d41f1b4: 
> [shardId-00000001736150901519-43bc576d]}}
> Assignment times of those shards to the readers:
>  * 2025-01-07T13:51:30.094 ({{{}shardId-00000001736149119171-7fe92d34{}}})
>  * Never assigned ({{{}shardId-00000001736148094601-d38142b5{}}})
>  * 2025-01-07T13:51:30.095 ({{{}shardId-00000001736150901519-43bc576d{}}})
>  
> h2. Instance-2
>  * Attempt to cancel some tasks at {{2025-02-14T20:04:35.840667Z}}
>  * At {{2025-02-14T20:04:36.166849Z}} the TM-1-8 finally say that the JOB has 
> been cancelled and they have sent job cancellation event to the JM, but 
> seemingly there is no cancellation event in the JM logs
>  * Again, at {{{}2025-02-14T20:05:46.940638Z{}}}, the TM-1-8 try to cancel 
> the job
>  * At {{{}2025-02-14T20:05:47.151611Z{}}}, TM-1-8 completes cancellation of 
> the job and sends cancellation message to JM but again, the JM has never 
> received a cancellation event of the job
>  * Job restore from savepoint 14: {{2025-02-14T20:14:34.814526406Z}}
> We see that at around {{2025-02-14T20:05:46.084798Z}} the child shard stopped 
> getting polled, so this might be the time when the state got corrupted
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to