Thanks for y our insight. I am still trying to understand exactly what happens here. We currently have the default setting in kafka, and we set the "transaction.timeout.ms" to 15 minutes (which also happen to be the default "transaction.max.timeout.ms". My expectation would be that if our savepoint is more than 15 minutes old it would fail, but that is not the case.
I still think we need to extend the "transaction.max.timeout.ms" to something like 7 days, as a 7 days old savepoints is effectively worthless, and probably adjust "transaction.timeout.ms" to be close to this. But can you explain how "transactional.id.expiration.ms" influences the InvalidPidMappingException, or why having "transactional.id.expiration.ms" < "transaction.timeout.ms" helps? Kind regards Jean-Marc ________________________________ From: Yanfei Lei <fredia...@gmail.com> Sent: Monday, April 22, 2024 03:28 To: Jean-Marc Paulin <j...@uk.ibm.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: [EXTERNAL] Re: Flink 1.18: Unable to resume from a savepoint with error InvalidPidMappingException Hi JM, Yes, `InvalidPidMappingException` occurs because the transaction is lost in most cases. For short-term, " transaction.timeout.ms" > "transactional.id.expiration.ms" can ignore the `InvalidPidMappingException`[1]. For long-term, FLIP-319[2] provides a solution. [1] https://speakerdeck.com/rmetzger/3-flink-mistakes-we-made-so-you-wont-have-to?slide=13 [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 Jean-Marc Paulin <j...@uk.ibm.com> 于2024年4月20日周六 02:30写道: > > Hi, > > we use Flink 1.18 with Kafka Sink, and we enabled `EXACTLY_ONCE` on one of > our kafka sink. We set the transation timeout to 15 minutes. When we try to > restore from a savepoint, way after that 15 minutes window, Flink enter in a > RESTARTING loop. We see the error: > > ``` > { > "exception": { > "exception_class": > "org.apache.kafka.common.errors.InvalidPidMappingException", > "exception_message": "The producer attempted to use a producer id which > is not currently assigned to its transactional id.", > "stacktrace": "org.apache.kafka.common.errors.InvalidPidMappingException: > The producer attempted to use a producer id which is not currently assigned > to its transactional id.\n" > }, > "@version": 1, > "source_host": "aiops-ir-lifecycle-eventprocessor-ep-jobmanager-0", > "message": "policy-exec::schedule-policy-execution -> > (policy-exec::select-kafka-async-policy-stages, > policy-exec::select-async-policy-stages -> > policy-exec::execute-async-policy-stages, > policy-exec::select-non-async-policy-stages, Sink: stories-input, Sink: > policy-completion-results, Sink: stories-changes, Sink: alerts-input, Sink: > story-notifications-output, Sink: alerts-output, Sink: alerts-changes, Sink: > connector-alerts, Sink: updated-events-output, Sink: stories-output, Sink: > runbook-execution-requests) (6/6) > (3f8cb042c1aa628891c444466a8b52d1_593c33b9decafa4ad6ae85c185860bef_5_0) > switched from INITIALIZING to FAILED on > aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc:6122-d2828c > @ > aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc.cluster.local > (dataPort=6121).", > "thread_name": "flink-pekko.actor.default-dispatcher-18", > "@timestamp": "2024-04-19T11:11:05.169+0000", > "level": "INFO", > "logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph" > } > ``` > As much as I understanding the transaction is lost, would it be possible to > ignore this particular error and resume the job anyway? > > Thanks for any suggestions > > JM > > > Unless otherwise stated above: > > IBM United Kingdom Limited > Registered in England and Wales with number 741598 > Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU -- Best, Yanfei Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU