[ https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17128272#comment-17128272 ]
Aljoscha Krettek edited comment on FLINK-17327 at 6/8/20, 1:51 PM: ------------------------------------------------------------------- I managed to find a fix for this: - change our code to always use {{close()}} with a timeout on the Kafka Producer, if not, we might leave lingering threads - this alone does not work because of KAFKA-7763, i.e. on shutdown requests are not properly cancelled, which leaves lingering threads - the fix KAFKA-7763 also introduces code that aborts outstanding transactions when cancelling. This doesn't work together with our exactly-once Kafka Producer - you need a patched Kafka that includes the fix part of KAFKA-7763, without the code that aborts transactions, I'm attaching a patch for that against the Kafka 2.4 branch The changes needed in Flink are here: https://github.com/aljoscha/flink/tree/flink-17327-kafka-clean-shutdown-2.4. Patch for Kafka is attached. I don't think the Kafka project will like that patch, though, because aborting outstanding transactions is valid for Kafka Streams/KSQL where pending transactions that are not cancelled with block downstream consumption. was (Author: aljoscha): I managed to find a fix for this: - change our code to always use {{close()}} with a timeout on the Kafka Producer, if not, we might leave lingering threads - this alone does not work because of KAFKA-7763, i.e. on shutdown requests are not properly cancelled, which leaves lingering threads - the fix KAFKA-7763 also introduces code that aborts outstanding transactions when cancelling. This doesn't work together with out exactly-once Kafka Producer - you need a patched Kafka that includes the fix part of KAFKA-7763, without the code that aborts transactions, I'm attaching a patch for that against the Kafka 2.4 branch The changes needed in Flink are here: https://github.com/aljoscha/flink/tree/flink-17327-kafka-clean-shutdown-2.4. Patch for Kafka is attached. I don't think the Kafka project will like that patch, though, because aborting outstanding transactions is valid for Kafka Streams/KSQL where pending transactions that are not cancelled with block downstream consumption. > Kafka unavailability could cause Flink TM shutdown > -------------------------------------------------- > > Key: FLINK-17327 > URL: https://issues.apache.org/jira/browse/FLINK-17327 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.10.0 > Reporter: Jun Qin > Priority: Critical > Fix For: 1.12.0 > > Attachments: 0001-Change-version-to-2.4.2-ALJOSCHA.patch, > 0002-Don-t-abort-in-flight-transactions.patch, Standalonesession.log, TM.log, > TM_produer_only_task.log > > > Steps to reproduce: > # Start a Flink 1.10 standalone cluster > # Run a Flink job which reads from one Kafka topic and writes to another > topic, with exactly-once checkpointing enabled > # Stop all Kafka Brokers after a few successful checkpoints > When Kafka brokers are down: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # Then, Flink could not complete snapshot due to {{Timeout expired while > initializing transactional state in 60000ms}} > # After several snapshot failures, Flink reported {{Too many ongoing > snapshots. Increase kafka producers pool size or decrease number of > concurrent checkpoints.}} > # Eventually, Flink tried to cancel the task which did not succeed within 3 > min. According to logs, consumer was cancelled, but producer is still running > # Then {{Fatal error occurred while executing the TaskManager. Shutting it > down...}} > I will attach the logs to show the details. Worth to note that if there > would be no consumer but producer only in the task, the behavior is different: > # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker > could not be established > # after {{delivery.timeout.ms}} (2min by default), producer reports: > {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for > output-topic-0:120001 ms has passed since batch creation}} > # Flink tried to cancel the upstream tasks and created a new producer > # The new producer obviously reported connectivity issue to brokers > # This continues till Kafka brokers are back. > # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool > size or decrease number of concurrent checkpoints.}} > # Flink cancelled the tasks and restarted them > # The job continues, and new checkpoint succeeded. > # TM runs all the time in this scenario > I set Kafka transaction time out to 1 hour just to avoid transaction timeout > during the test. > To get a producer only task, I called {{env.disableOperatorChaining();}} in > the second scenario. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)