[ 
https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17445945#comment-17445945
 ] 

Jiahui Jiang edited comment on FLINK-16419 at 11/18/21, 3:25 PM:
-----------------------------------------------------------------

[~fpaul] we actually set it to be super high (2147483647ms which is around 24 
days) to avoid the issue. And we were seeing "The producer attempted to use a 
producer id which is not currently assigned to its transactional id", but I 
believe transaction timeout would throw an explicit timeout exception.

I was reading Kafka transactions design, even though transnational ID shouldn't 
expire when transactions are being committed, the producer ID itself may expire 
if there is no record being written in for `transactional.id.expiration.ms` 
amount of time. 

[Link|https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit]
 to the original design docs. On page 37 `PID Expiration` section. `We expire 
producerId’s when the age of the last message with that producerId exceeds the 
transactionalId expiration time or the topic’s retention time, whichever 
happens sooner.`

In Flink, for each checkpoint when we begin a transaction it would create [a 
new 
producer|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L987].
 But with the same transactional id, Kafka would return the [existing 
PID|https://github.com/apache/kafka/blob/6b005b2b4eece81a5500fb0080ef5354b4240681/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L234]
 if the epoch is not exhausted.


was (Author: qzhzm173227):
[~fpaul] we actually set it to be super high (2147483647ms which is around 24 
days) to avoid the issue. And we were seeing "The producer attempted to use a 
producer id which is not currently assigned to its transactional id", but I 
believe transaction timeout would throw an explicit timeout exception.

I was reading Kafka transactions design, even though transnational ID shouldn't 
expire when transactions are being committed, the producer ID itself may expire 
if there is no record being written in for `transactional.id.expiration.ms` 
amount of time. 

[Link|https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit]
 to the original design docs. On page 37 `PID Expiration` section. `We expire 
producerId’s when the age of the last message with that producerId exceeds the 
transactionalId expiration time or the topic’s retention time, whichever 
happens sooner.`

In Flink, for each checkpoint when we begin a transaction it would create [a 
new 
producer|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L987].
 But with the same transactional id, Kafka would return the existing PID. (See 
design doc page 21 `2.1 When a TransactionalId is specified`).

> Avoid to recommit transactions which are known committed successfully to 
> Kafka upon recovery
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-16419
>                 URL: https://issues.apache.org/jira/browse/FLINK-16419
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Runtime / Checkpointing
>            Reporter: Jun Qin
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor, usability
>
> When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer 
> tries to recommit all pre-committed transactions which are in the snapshot, 
> even if those transactions were successfully committed before (i.e., the call 
> to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} 
> returns OK). This may lead to recovery failures when recovering from a very 
> old snapshot because the transactional IDs in that snapshot may have been 
> expired and removed from Kafka.  For example the following scenario:
>  # Start a Flink job with FlinkKafkaProducer sink with exactly-once
>  # Suspend the Flink job with a savepoint A
>  # Wait for time longer than {{transactional.id.expiration.ms}} + 
> {{transaction.remove.expired.transaction.cleanup.interval.ms}}
>  # Recover the job with savepoint A.
>  # The recovery will fail with the following error:
> {noformat}
> 2020-02-26 14:33:25,817 INFO  
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>   - Attempting to resume transaction Source: Custom Source -> Sink: 
> Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch 
> 1202020-02-26 14:33:25,914 INFO  org.apache.kafka.clients.Metadata            
>                 - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA
> 2020-02-26 14:33:26,017 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>              - [Producer clientId=producer-1, transactionalId=Source: Custom 
> Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka 
> producer with timeoutMillis = 92233720
> 36854775807 ms.
> 2020-02-26 14:33:26,019 INFO  org.apache.flink.runtime.taskmanager.Task       
>              - Source: Custom Source -> Sink: Unnamed (1/1) 
> (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id.
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> For now, the workaround is to call 
> {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as 
> it may hide real transaction timeout errors. 
> After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible 
> way is to let JobManager, after successfully notifies all operators the 
> completion of a snapshot (via {{notifyCheckpoingComplete}}), record the 
> success, e.g., write the successful transactional IDs somewhere in the 
> snapshot. Then those transactions need not recommit upon recovery.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to