[ https://issues.apache.org/jira/browse/FLINK-37416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Abhi Gupta updated FLINK-37416: ------------------------------- Description: We found that for a customer when there are multiple restarts, there are state inconsistency issues between the job manager and the task manager. There is a shard which is in ASSIGNED state in the JobManager, but that shard is not even present in the TaskManager state. Here is a rough timeline of the two events: h2. Instance-1 There are 4 task managers as seen from the logs The data loss started at around {{{}2025-01-07 13:51:30.094{}}}. Job restarted from checkpoint 2 at around 13:39 * Task cancellation triggered at: {{2025-01-07 13:38:34.024}} * Task got cancelled at: {{2025-01-07 13:38:34.626}} * Task restarted at: {{2025-01-07 13:39:26.657}} (Start time of the job from scratch after removing the savepoint) no checkpoint used to restore the job * Another cancellation triggered at: {{2025-01-07 13:52:21.386}} * Cancellation completed at: {{{}2025-01-07 13:52:21.976{}}}. Job restarted from checkpoint 2 * Another cancellation triggered at: {{2025-01-07 15:36:55.472}} * This cancellation completed at: {{{}2025-01-07 15:36:57.627{}}}. Job restarted from checkpoint 106 * Another cancellation triggered at: {{2025-01-07 16:43:17.177}} * This cancellation completed at: {{{}2025-01-07 16:43:17.661{}}}. Job restarted from checkpoint 172 * Another cancellation: {{2025-01-07 17:49:33.387}} * This cancellation completed at: {{2025-01-07 17:49:34.117}} Checkpoint completion times. These are taken using the following log: Committing buffer-money-movements-docstore-all-protos/95b40845f9d7ccf0dfbe8eeb58830194/chk-172/_metadata with MPU ID xxx |Checkpoint|Time finished| |2|2025-01-07T13:51:31.500| |106|2025-01-07T15:36:07.984| |172|2025-01-07T16:42:25.420| |238|2025-01-07T17:49:25.897| Shards which have state consistency issue per operator: {{d9142698e8c34a850f718000f27c06f1: [shardId-00000001736149119171-7fe92d34]}} {{Missing splits in 464d94ba07e08db815bc2c93bc566b1a: [shardId-00000001736148094601-d38142b5]}} {{Missing splits in e5540dfffda9a53d8430f30b2d41f1b4: [shardId-00000001736150901519-43bc576d]}} Assignment times of those shards to the readers: * 2025-01-07T13:51:30.094 ({{{}shardId-00000001736149119171-7fe92d34{}}}) * Never assigned ({{{}shardId-00000001736148094601-d38142b5{}}}) * 2025-01-07T13:51:30.095 ({{{}shardId-00000001736150901519-43bc576d{}}}) h2. Instance-2 * Attempt to cancel some tasks at {{2025-02-14T20:04:35.840667Z}} * At {{2025-02-14T20:04:36.166849Z}} the TM-1-8 finally say that the JOB has been cancelled and they have sent job cancellation event to the JM, but seemingly there is no cancellation event in the JM logs * Again, at {{{}2025-02-14T20:05:46.940638Z{}}}, the TM-1-8 try to cancel the job * At {{{}2025-02-14T20:05:47.151611Z{}}}, TM-1-8 completes cancellation of the job and sends cancellation message to JM but again, the JM has never received a cancellation event of the job * Job restore from savepoint 14: {{2025-02-14T20:14:34.814526406Z}} We see that at around {{2025-02-14T20:05:46.084798Z}} the child shard stopped getting polled, so this might be the time when the state got corrupted was: We found that for a customer when there are multiple restarts, there are state inconsistency issues between the job manager and the task manager. There is a shard which is in ASSIGNED state in the JobManager, but that shard is not even present in the TaskManager state. Here is a rough timeline of the two events: ## Instance-1 There are 4 task managers as seen from the logs The data loss started at around {{{}2025-01-07 13:51:30.094{}}}. Job restarted from checkpoint 2 at around 13:39 * Task cancellation triggered at: {{2025-01-07 13:38:34.024}} * Task got cancelled at: {{2025-01-07 13:38:34.626}} * Task restarted at: {{2025-01-07 13:39:26.657}} (Start time of the job from scratch after removing the savepoint) no checkpoint used to restore the job * Another cancellation triggered at: {{2025-01-07 13:52:21.386}} * Cancellation completed at: {{{}2025-01-07 13:52:21.976{}}}. Job restarted from checkpoint 2 * Another cancellation triggered at: {{2025-01-07 15:36:55.472}} * This cancellation completed at: {{{}2025-01-07 15:36:57.627{}}}. Job restarted from checkpoint 106 * Another cancellation triggered at: {{2025-01-07 16:43:17.177}} * This cancellation completed at: {{{}2025-01-07 16:43:17.661{}}}. Job restarted from checkpoint 172 * Another cancellation: {{2025-01-07 17:49:33.387}} * This cancellation completed at: {{2025-01-07 17:49:34.117}} Checkpoint completion times. These are taken using the following log: Committing buffer-money-movements-docstore-all-protos/95b40845f9d7ccf0dfbe8eeb58830194/chk-172/_metadata with MPU ID xxx |Checkpoint|Time finished| |2|2025-01-07T13:51:31.500| |106|2025-01-07T15:36:07.984| |172|2025-01-07T16:42:25.420| |238|2025-01-07T17:49:25.897| Shards which have state consistency issue per operator: {{d9142698e8c34a850f718000f27c06f1: [shardId-00000001736149119171-7fe92d34]}} {{Missing splits in 464d94ba07e08db815bc2c93bc566b1a: [shardId-00000001736148094601-d38142b5]}} {{Missing splits in e5540dfffda9a53d8430f30b2d41f1b4: [shardId-00000001736150901519-43bc576d]}} Assignment times of those shards to the readers: * 2025-01-07T13:51:30.094 ({{{}shardId-00000001736149119171-7fe92d34{}}}) * Never assigned ({{{}shardId-00000001736148094601-d38142b5{}}}) * 2025-01-07T13:51:30.095 ({{{}shardId-00000001736150901519-43bc576d{}}}) ## Instance-2 * Attempt to cancel some tasks at {{2025-02-14T20:04:35.840667Z}} * At {{2025-02-14T20:04:36.166849Z}} the TM-1-8 finally say that the JOB has been cancelled and they have sent job cancellation event to the JM, but seemingly there is no cancellation event in the JM logs * Again, at {{{}2025-02-14T20:05:46.940638Z{}}}, the TM-1-8 try to cancel the job * At {{{}2025-02-14T20:05:47.151611Z{}}}, TM-1-8 completes cancellation of the job and sends cancellation message to JM but again, the JM has never received a cancellation event of the job * Job restore from savepoint 14: {{2025-02-14T20:14:34.814526406Z}} We see that at around {{2025-02-14T20:05:46.084798Z}} the child shard stopped getting polled, so this might be the time when the state got corrupted > DDB Streams connector flink state inconsistency issue > ----------------------------------------------------- > > Key: FLINK-37416 > URL: https://issues.apache.org/jira/browse/FLINK-37416 > Project: Flink > Issue Type: Bug > Components: Connectors / DynamoDB > Environment: This is a flink environment created using EKS cluster > with Karpenter autoscaler > Reporter: Abhi Gupta > Priority: Major > > We found that for a customer when there are multiple restarts, there are > state inconsistency issues between the job manager and the task manager. > There is a shard which is in ASSIGNED state in the JobManager, but that shard > is not even present in the TaskManager state. Here is a rough timeline of the > two events: > h2. Instance-1 > There are 4 task managers as seen from the logs > The data loss started at around {{{}2025-01-07 13:51:30.094{}}}. Job > restarted from checkpoint 2 at around 13:39 > > * Task cancellation triggered at: {{2025-01-07 13:38:34.024}} > * Task got cancelled at: {{2025-01-07 13:38:34.626}} > * Task restarted at: {{2025-01-07 13:39:26.657}} (Start time of the job from > scratch after removing the savepoint) no checkpoint used to restore the job > * Another cancellation triggered at: {{2025-01-07 13:52:21.386}} > * Cancellation completed at: {{{}2025-01-07 13:52:21.976{}}}. Job restarted > from checkpoint 2 > * Another cancellation triggered at: {{2025-01-07 15:36:55.472}} > * This cancellation completed at: {{{}2025-01-07 15:36:57.627{}}}. Job > restarted from checkpoint 106 > * Another cancellation triggered at: {{2025-01-07 16:43:17.177}} > * This cancellation completed at: {{{}2025-01-07 16:43:17.661{}}}. Job > restarted from checkpoint 172 > * Another cancellation: {{2025-01-07 17:49:33.387}} > * This cancellation completed at: {{2025-01-07 17:49:34.117}} > Checkpoint completion times. These are taken using the following log: > Committing > buffer-money-movements-docstore-all-protos/95b40845f9d7ccf0dfbe8eeb58830194/chk-172/_metadata > with MPU ID xxx > |Checkpoint|Time finished| > |2|2025-01-07T13:51:31.500| > |106|2025-01-07T15:36:07.984| > |172|2025-01-07T16:42:25.420| > |238|2025-01-07T17:49:25.897| > Shards which have state consistency issue per operator: > {{d9142698e8c34a850f718000f27c06f1: [shardId-00000001736149119171-7fe92d34]}} > {{Missing splits in 464d94ba07e08db815bc2c93bc566b1a: > [shardId-00000001736148094601-d38142b5]}} > {{Missing splits in e5540dfffda9a53d8430f30b2d41f1b4: > [shardId-00000001736150901519-43bc576d]}} > Assignment times of those shards to the readers: > * 2025-01-07T13:51:30.094 ({{{}shardId-00000001736149119171-7fe92d34{}}}) > * Never assigned ({{{}shardId-00000001736148094601-d38142b5{}}}) > * 2025-01-07T13:51:30.095 ({{{}shardId-00000001736150901519-43bc576d{}}}) > > h2. Instance-2 > * Attempt to cancel some tasks at {{2025-02-14T20:04:35.840667Z}} > * At {{2025-02-14T20:04:36.166849Z}} the TM-1-8 finally say that the JOB has > been cancelled and they have sent job cancellation event to the JM, but > seemingly there is no cancellation event in the JM logs > * Again, at {{{}2025-02-14T20:05:46.940638Z{}}}, the TM-1-8 try to cancel > the job > * At {{{}2025-02-14T20:05:47.151611Z{}}}, TM-1-8 completes cancellation of > the job and sends cancellation message to JM but again, the JM has never > received a cancellation event of the job > * Job restore from savepoint 14: {{2025-02-14T20:14:34.814526406Z}} > We see that at around {{2025-02-14T20:05:46.084798Z}} the child shard stopped > getting polled, so this might be the time when the state got corrupted > -- This message was sent by Atlassian Jira (v8.20.10#820010)