Thanks for y our insight.

I am still trying to understand exactly what happens here. We currently have 
the default setting in kafka, and we set the "transaction.timeout.ms" to 15 
minutes (which also happen to be the default "transaction.max.timeout.ms".  My 
expectation would be that if our savepoint is more than 15 minutes old it would 
fail, but that is not the case.

I still think we need to extend the "transaction.max.timeout.ms" to something 
like 7 days, as a 7 days old savepoints is effectively worthless, and probably 
adjust  "transaction.timeout.ms" to be close to this.

But can you explain how "transactional.id.expiration.ms" influences the 
InvalidPidMappingException, or why having "transactional.id.expiration.ms" < 
"transaction.timeout.ms" helps?

Kind regards

Jean-Marc


________________________________
From: Yanfei Lei <fredia...@gmail.com>
Sent: Monday, April 22, 2024 03:28
To: Jean-Marc Paulin <j...@uk.ibm.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: [EXTERNAL] Re: Flink 1.18: Unable to resume from a savepoint with 
error InvalidPidMappingException

Hi JM,

Yes, `InvalidPidMappingException` occurs because the transaction is
lost in most cases.

For short-term, " transaction.timeout.ms" >
"transactional.id.expiration.ms" can ignore the
`InvalidPidMappingException`[1].
For long-term, FLIP-319[2] provides a solution.

[1] 
https://speakerdeck.com/rmetzger/3-flink-mistakes-we-made-so-you-wont-have-to?slide=13
[2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710

Jean-Marc Paulin <j...@uk.ibm.com> 于2024年4月20日周六 02:30写道:
>
> Hi,
>
> we use Flink 1.18 with Kafka Sink, and we enabled `EXACTLY_ONCE` on one of 
> our kafka sink. We set the transation timeout to 15 minutes. When we try to 
> restore from a savepoint, way after that 15 minutes window, Flink enter in a 
> RESTARTING loop. We see the error:
>
> ```
> {
>   "exception": {
>     "exception_class": 
> "org.apache.kafka.common.errors.InvalidPidMappingException",
>     "exception_message": "The producer attempted to use a producer id which 
> is not currently assigned to its transactional id.",
>     "stacktrace": "org.apache.kafka.common.errors.InvalidPidMappingException: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.\n"
>   },
>   "@version": 1,
>   "source_host": "aiops-ir-lifecycle-eventprocessor-ep-jobmanager-0",
>   "message": "policy-exec::schedule-policy-execution -> 
> (policy-exec::select-kafka-async-policy-stages, 
> policy-exec::select-async-policy-stages -> 
> policy-exec::execute-async-policy-stages, 
> policy-exec::select-non-async-policy-stages, Sink: stories-input, Sink: 
> policy-completion-results, Sink: stories-changes, Sink: alerts-input, Sink: 
> story-notifications-output, Sink: alerts-output, Sink: alerts-changes, Sink: 
> connector-alerts, Sink: updated-events-output, Sink: stories-output, Sink: 
> runbook-execution-requests) (6/6) 
> (3f8cb042c1aa628891c444466a8b52d1_593c33b9decafa4ad6ae85c185860bef_5_0) 
> switched from INITIALIZING to FAILED on 
> aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc:6122-d2828c
>  @ 
> aiops-ir-lifecycle-eventprocessor-ep-taskmanager-1.aiops-ir-lifecycle-eventprocessor-ep-taskmanager.cp4aiops.svc.cluster.local
>  (dataPort=6121).",
>   "thread_name": "flink-pekko.actor.default-dispatcher-18",
>   "@timestamp": "2024-04-19T11:11:05.169+0000",
>   "level": "INFO",
>   "logger_name": "org.apache.flink.runtime.executiongraph.ExecutionGraph"
> }
> ```
> As much as I understanding the transaction is lost, would it be possible to 
> ignore this particular error and resume the job anyway?
>
> Thanks for any suggestions
>
> JM
>
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU



--
Best,
Yanfei

Unless otherwise stated above:

IBM United Kingdom Limited
Registered in England and Wales with number 741598
Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU

Reply via email to