Grzegorz Liter created FLINK-37460: -------------------------------------- Summary: Using State Processor API and Kafka Sink with Exactly once delivery leads to org.apache.kafka.common.errors.InvalidPidMappingException Key: FLINK-37460 URL: https://issues.apache.org/jira/browse/FLINK-37460 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.20.1 Reporter: Grzegorz Liter
Setup: # Job with KafkaSink with `DeliveryGuarantee.EXACTLY_ONCE` # Kafka Cluster with max transaction time e.g. 24h Steps to reproduce: # Run the job for at least 24h # Stop with savepoint # Use State Processor API to rewrite savepoint # Start job from rewritten savepoint Actual: Job will fail after about 24h with ``` Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.2025-03-03 08:42:14.749 at java.lang.Thread.run(Unknown Source) ~[?:?]2025-03-03 08:42:14.749 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:135) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:174) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:190) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:134) ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119) ~[?:?]2025-03-03 08:42:14.749 at org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:85) ~[flink-dist-1.20.0.jar:1.20.0] ``` Explanation: When using EXACTLY_ONCE, KafkaSink writes KafkaCommitable to the checkpoint which contains current transactionalId with current producerId. After checkpoint is completed transaction is commited. When Flink start up the job from checkpoint/savepoint in state initialization `AbstractStreamOperator.initializeState` it will in `CommitterOperator.commitAndEmitCheckpoints` get all checkpoint commitables up to last completed checkpoint `committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)` and try to commit them. Problem starts when we use State Processing API, which is resetting checkpointId to 0. Initially there is no problem but when we reach the checkpointId equal to the checkpointId of savepoint we were migrating. The `committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)` loads not only latest transaction details but also the last transaction that happen before migration. If the migration happen earlier than Kafka transaction expiration time KafkaCommiter will get `org.apache.kafka.common.errors.InvalidPidMappingException`. -- This message was sent by Atlassian Jira (v8.20.10#820010)