[ https://issues.apache.org/jira/browse/FLINK-37460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser updated FLINK-37460: ----------------------------------- Component/s: API / State Processor (was: Runtime / Checkpointing) > Using State Processor API and Kafka Sink with Exactly once delivery leads to > org.apache.kafka.common.errors.InvalidPidMappingException > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-37460 > URL: https://issues.apache.org/jira/browse/FLINK-37460 > Project: Flink > Issue Type: Bug > Components: API / State Processor > Affects Versions: 1.20.1 > Reporter: Grzegorz Liter > Priority: Major > > Setup: > # Job with KafkaSink with `DeliveryGuarantee.EXACTLY_ONCE` > # Kafka Cluster with max transaction time e.g. 24h > Steps to reproduce: > # Run the job for at least 24h > # Stop with savepoint > # Use State Processor API to rewrite savepoint > # Start job from rewritten savepoint > Actual: > Job will fail after about 24h with > ``` > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transactional id.2025-03-03 08:42:14.749 > at java.lang.Thread.run(Unknown Source) ~[?:?]2025-03-03 08:42:14.749 > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:135) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:174) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:190) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:134) > ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749 > at > org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119) > ~[?:?]2025-03-03 08:42:14.749 > at > org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:85) > ~[flink-dist-1.20.0.jar:1.20.0] > ``` > Explanation: > When using EXACTLY_ONCE, KafkaSink writes KafkaCommitable to the checkpoint > which contains current transactionalId with current producerId. After > checkpoint is completed transaction is commited. > When Flink start up the job from checkpoint/savepoint in state initialization > `AbstractStreamOperator.initializeState` it will in > `CommitterOperator.commitAndEmitCheckpoints` get all checkpoint commitables > up to last completed checkpoint > `committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)` > and try to commit them. > Problem starts when we use State Processing API, which is resetting > checkpointId to 0. Initially there is no problem but when we reach the > checkpointId equal to the checkpointId of savepoint we were migrating. > The > `committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)` > loads not only latest transaction details but also the last transaction that > happen before migration. If the migration happen earlier than Kafka > transaction expiration time KafkaCommiter will get > `org.apache.kafka.common.errors.InvalidPidMappingException`. -- This message was sent by Atlassian Jira (v8.20.10#820010)