[ https://issues.apache.org/jira/browse/FLINK-16419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17444812#comment-17444812 ]
Jiahui Jiang edited comment on FLINK-16419 at 11/16/21, 9:22 PM: ----------------------------------------------------------------- [~becket_qin] [~qinjunjerry] We are seeing issues in production where a very low throughput output stream can cause pipeline to be not recoverable because of this. As mentioned in the ticket 'producer.ignoreFailuresAfterTransactionTimeout()' is a workaround, but this will have to be a manual operation since the transaction timeout can be failing for a real service exception. Is it possible to prioritize this ticket? Thank you! was (Author: qzhzm173227): [~becket_qin] [~qinjunjerry] We are seeing issues in production where a very low throughput output stream can cause pipeline to be not recoverable because of this. We tried to increase `{{{}transactional.id.expiration.ms`{}}} on broker side. But unfortunately this field is an integer not a long, so the maximum time range we can set is 23 days. Is there any workaround besides a Flink fix or writing our own Kafka producer? Thank you! > Avoid to recommit transactions which are known committed successfully to > Kafka upon recovery > -------------------------------------------------------------------------------------------- > > Key: FLINK-16419 > URL: https://issues.apache.org/jira/browse/FLINK-16419 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Runtime / Checkpointing > Reporter: Jun Qin > Priority: Minor > Labels: auto-deprioritized-major, stale-minor, usability > > When recovering from a snapshot (checkpoint/savepoint), FlinkKafkaProducer > tries to recommit all pre-committed transactions which are in the snapshot, > even if those transactions were successfully committed before (i.e., the call > to {{kafkaProducer.commitTransaction()}} via {{notifyCheckpointComplete()}} > returns OK). This may lead to recovery failures when recovering from a very > old snapshot because the transactional IDs in that snapshot may have been > expired and removed from Kafka. For example the following scenario: > # Start a Flink job with FlinkKafkaProducer sink with exactly-once > # Suspend the Flink job with a savepoint A > # Wait for time longer than {{transactional.id.expiration.ms}} + > {{transaction.remove.expired.transaction.cleanup.interval.ms}} > # Recover the job with savepoint A. > # The recovery will fail with the following error: > {noformat} > 2020-02-26 14:33:25,817 INFO > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer > - Attempting to resume transaction Source: Custom Source -> Sink: > Unnamed-7df19f87deec5680128845fd9a6ca18d-1 with producerId 2001 and epoch > 1202020-02-26 14:33:25,914 INFO org.apache.kafka.clients.Metadata > - Cluster ID: RN0aqiOwTUmF5CnHv_IPxA > 2020-02-26 14:33:26,017 INFO org.apache.kafka.clients.producer.KafkaProducer > - [Producer clientId=producer-1, transactionalId=Source: Custom > Source -> Sink: Unnamed-7df19f87deec5680128845fd9a6ca18d-1] Closing the Kafka > producer with timeoutMillis = 92233720 > 36854775807 ms. > 2020-02-26 14:33:26,019 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Sink: Unnamed (1/1) > (a77e457941f09cd0ebbd7b982edc0f02) switched from RUNNING to FAILED. > org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: > The producer attempted to use a producer id which is not currently assigned > to its transactional id. > at > org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1191) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:288) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) > at java.lang.Thread.run(Thread.java:748) > {noformat} > For now, the workaround is to call > {{producer.ignoreFailuresAfterTransactionTimeout()}}. This is a bit risky, as > it may hide real transaction timeout errors. > After discussed with [~becket_qin], [~pnowojski] and [~aljoscha], a possible > way is to let JobManager, after successfully notifies all operators the > completion of a snapshot (via {{notifyCheckpoingComplete}}), record the > success, e.g., write the successful transactional IDs somewhere in the > snapshot. Then those transactions need not recommit upon recovery. -- This message was sent by Atlassian Jira (v8.20.1#820001)