Grzegorz Liter created FLINK-37460:
--------------------------------------

             Summary: Using State Processor API and Kafka Sink with Exactly 
once delivery leads to org.apache.kafka.common.errors.InvalidPidMappingException
                 Key: FLINK-37460
                 URL: https://issues.apache.org/jira/browse/FLINK-37460
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.20.1
            Reporter: Grzegorz Liter


Setup:
# Job with KafkaSink with `DeliveryGuarantee.EXACTLY_ONCE`
# Kafka Cluster with max transaction time e.g. 24h

Steps to reproduce:
# Run the job for at least 24h
# Stop with savepoint
# Use State Processor API to rewrite savepoint
# Start job from rewritten savepoint

Actual:
Job will fail after about 24h with 
```
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.2025-03-03 08:42:14.749
        at java.lang.Thread.run(Unknown Source) ~[?:?]2025-03-03 08:42:14.749
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:135)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:174)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:190)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:134)
 ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
        at 
org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
 ~[?:?]2025-03-03 08:42:14.749
        at 
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:85)
 ~[flink-dist-1.20.0.jar:1.20.0]
```

Explanation:
When using EXACTLY_ONCE, KafkaSink writes KafkaCommitable to the checkpoint 
which contains current transactionalId with current producerId. After 
checkpoint is completed transaction is commited.

When Flink start up the job from checkpoint/savepoint in state initialization 
`AbstractStreamOperator.initializeState` it will in 
`CommitterOperator.commitAndEmitCheckpoints` get all checkpoint commitables up 
to last completed checkpoint 
`committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)` 
and try to commit them.

Problem starts when we use State Processing API, which is resetting 
checkpointId to 0. Initially there is no problem but when we reach the 
checkpointId equal to the checkpointId of savepoint we were migrating.
The 
`committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)` 
loads not only latest transaction details but also the last transaction that 
happen before migration. If the migration happen earlier than Kafka transaction 
expiration time KafkaCommiter will get 
`org.apache.kafka.common.errors.InvalidPidMappingException`.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to