[ 
https://issues.apache.org/jira/browse/FLINK-37460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17934260#comment-17934260
 ] 

Grzegorz Liter commented on FLINK-37460:
----------------------------------------

[~gaborgsomogyi] Sure, let me know.

> Using State Processor API and Kafka Sink with Exactly once delivery leads to 
> org.apache.kafka.common.errors.InvalidPidMappingException
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37460
>                 URL: https://issues.apache.org/jira/browse/FLINK-37460
>             Project: Flink
>          Issue Type: Bug
>          Components: API / State Processor
>    Affects Versions: 1.20.1
>            Reporter: Grzegorz Liter
>            Priority: Major
>
> Setup:
> # Job with KafkaSink with `DeliveryGuarantee.EXACTLY_ONCE`
> # Kafka Cluster with max transaction time e.g. 24h
> Steps to reproduce:
> # Run the job for at least 24h
> # Stop with savepoint
> # Use State Processor API to rewrite savepoint
> # Start job from rewritten savepoint
> Actual:
> Job will fail after about 24h with 
> ```
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.2025-03-03 08:42:14.749
>       at java.lang.Thread.run(Unknown Source) ~[?:?]2025-03-03 08:42:14.749
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:135)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:174)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:190)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:134)
>  ~[flink-dist-1.20.0.jar:1.20.0]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
>  ~[?:?]2025-03-03 08:42:14.749
>       at 
> org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:85)
>  ~[flink-dist-1.20.0.jar:1.20.0]
> ```
> Explanation:
> When using EXACTLY_ONCE, KafkaSink writes KafkaCommitable to the checkpoint 
> which contains current transactionalId with current producerId. After 
> checkpoint is completed transaction is commited.
> When Flink start up the job from checkpoint/savepoint in state initialization 
> `AbstractStreamOperator.initializeState` it will in 
> `CommitterOperator.commitAndEmitCheckpoints` get all checkpoint commitables 
> up to last completed checkpoint 
> `committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)`
>  and try to commit them.
> Problem starts when we use State Processing API, which is resetting 
> checkpointId to 0. Initially there is no problem but when we reach the 
> checkpointId equal to the checkpointId of savepoint we were migrating.
> The 
> `committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)`
>  loads not only latest transaction details but also the last transaction that 
> happen before migration. If the migration happen earlier than Kafka 
> transaction expiration time KafkaCommiter will get 
> `org.apache.kafka.common.errors.InvalidPidMappingException`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to