[ 
https://issues.apache.org/jira/browse/FLINK-37460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17934235#comment-17934235
 ] 

Grzegorz Liter commented on FLINK-37460:
----------------------------------------

[~gaborgsomogyi] yes, as this is what happens during stoping and starting job. 
The stored transaction details will be recent and KafkaSink will be able to try 
to commit (which is already commited) transaction without problem as it is not 
yet expired in Kafka Cluster.

To add context, this is problem when we e.g. stop job and then try to start it 
after transaction is expired by Kafka Cluster. This is documented here: 
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#fault-tolerance
In this case it is kind of simulating that as the transaction lingers for long 
time in the State due to resetting checkpoint id during State migration.

> Using State Processor API and Kafka Sink with Exactly once delivery leads to 
> org.apache.kafka.common.errors.InvalidPidMappingException
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37460
>                 URL: https://issues.apache.org/jira/browse/FLINK-37460
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.20.1
>            Reporter: Grzegorz Liter
>            Priority: Major
>
> Setup:
> # Job with KafkaSink with `DeliveryGuarantee.EXACTLY_ONCE`
> # Kafka Cluster with max transaction time e.g. 24h
> Steps to reproduce:
> # Run the job for at least 24h
> # Stop with savepoint
> # Use State Processor API to rewrite savepoint
> # Start job from rewritten savepoint
> Actual:
> Job will fail after about 24h with 
> ```
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.2025-03-03 08:42:14.749
>       at java.lang.Thread.run(Unknown Source) ~[?:?]2025-03-03 08:42:14.749
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:135)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:174)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:190)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:134)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
>  ~[?:?]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:85)
>  ~[flink-dist-1.20.0.jar:1.20.0]
> ```
> Explanation:
> When using EXACTLY_ONCE, KafkaSink writes KafkaCommitable to the checkpoint 
> which contains current transactionalId with current producerId. After 
> checkpoint is completed transaction is commited.
> When Flink start up the job from checkpoint/savepoint in state initialization 
> `AbstractStreamOperator.initializeState` it will in 
> `CommitterOperator.commitAndEmitCheckpoints` get all checkpoint commitables 
> up to last completed checkpoint 
> `committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)`
>  and try to commit them.
> Problem starts when we use State Processing API, which is resetting 
> checkpointId to 0. Initially there is no problem but when we reach the 
> checkpointId equal to the checkpointId of savepoint we were migrating.
> The 
> `committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)`
>  loads not only latest transaction details but also the last transaction that 
> happen before migration. If the migration happen earlier than Kafka 
> transaction expiration time KafkaCommiter will get 
> `org.apache.kafka.common.errors.InvalidPidMappingException`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to