Grzegorz Liter created FLINK-37460:
--------------------------------------
Summary: Using State Processor API and Kafka Sink with Exactly
once delivery leads to org.apache.kafka.common.errors.InvalidPidMappingException
Key: FLINK-37460
URL: https://issues.apache.org/jira/browse/FLINK-37460
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Affects Versions: 1.20.1
Reporter: Grzegorz Liter
Setup:
# Job with KafkaSink with `DeliveryGuarantee.EXACTLY_ONCE`
# Kafka Cluster with max transaction time e.g. 24h
Steps to reproduce:
# Run the job for at least 24h
# Stop with savepoint
# Use State Processor API to rewrite savepoint
# Start job from rewritten savepoint
Actual:
Job will fail after about 24h with
```
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The
producer attempted to use a producer id which is not currently assigned to its
transactional id.2025-03-03 08:42:14.749
at java.lang.Thread.run(Unknown Source) ~[?:?]2025-03-03 08:42:14.749
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:135)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:174)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:190)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:134)
~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
at
org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
~[?:?]2025-03-03 08:42:14.749
at
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:85)
~[flink-dist-1.20.0.jar:1.20.0]
```
Explanation:
When using EXACTLY_ONCE, KafkaSink writes KafkaCommitable to the checkpoint
which contains current transactionalId with current producerId. After
checkpoint is completed transaction is commited.
When Flink start up the job from checkpoint/savepoint in state initialization
`AbstractStreamOperator.initializeState` it will in
`CommitterOperator.commitAndEmitCheckpoints` get all checkpoint commitables up
to last completed checkpoint
`committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)`
and try to commit them.
Problem starts when we use State Processing API, which is resetting
checkpointId to 0. Initially there is no problem but when we reach the
checkpointId equal to the checkpointId of savepoint we were migrating.
The
`committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)`
loads not only latest transaction details but also the last transaction that
happen before migration. If the migration happen earlier than Kafka transaction
expiration time KafkaCommiter will get
`org.apache.kafka.common.errors.InvalidPidMappingException`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)