[ https://issues.apache.org/jira/browse/FLINK-32196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726405#comment-17726405 ]
Sharon Xie edited comment on FLINK-32196 at 5/25/23 10:04 PM: -------------------------------------------------------------- Thank you [~tzulitai] for the quick response and information. {quote}Are you actually observing that there are lingering transactions not being aborted in Kafka? Or was that a speculation based on not seeing a abortTransaction() in the code? {quote} This is a speculation. So this may not be the root cause of the issue I'm seeing. {quote}If there are actually lingering transactions in Kafka after restore, do they get timeout by Kafka after transaction.timeout.ms? Or are they lingering beyond the timeout threshold? {quote} What I've observed is that the subtask gets stuck in the initializing state and there is a growing number of kafka-producer-network-thread and the job eventually runs OOM - In the [^kafka_sink_oom_logs.csv], you can see lots of producers get closed in the end . In the debug log, I've found that the transaction thread never progress beyond “Transition from state INITIALIZING to READY” and eventually times out. An example thread log is [^kafka_producer_network_thread_log.csv] . A healthy transaction goes from INITIALIZING to READY- to COMMITTING_TRANSACTION to READY in the log and the thread doesn't exit - example [^healthy_kafka_producer_thread.csv]. I've also queried the kafka's _transaction_state topic for the problematic transaction and [here|https://gist.github.com/sharonx/51e300e383455f016be1a95f0c855b97] are the messages in the topic. I'd appreciate any pointers or potential ways to explain the situation. was (Author: sharonxr55): Thank you [~tzulitai] for the quick response and information. {quote}Are you actually observing that there are lingering transactions not being aborted in Kafka? Or was that a speculation based on not seeing a abortTransaction() in the code? {quote} This is a speculation. So this may not be the root cause of the issue I'm seeing. {quote}If there are actually lingering transactions in Kafka after restore, do they get timeout by Kafka after transaction.timeout.ms? Or are they lingering beyond the timeout threshold? {quote} What I've observed is that the subtask gets stuck in the initializing state and there is a growing number of kafka-producer-network-thread and the job eventually runs OOM - [^kafka_sink_oom_logs.csv] . In the debug log, I've found that the transaction thread never progress beyond “Transition from state INITIALIZING to READY” and eventually times out. An example thread log is [^kafka_producer_network_thread_log.csv] . A healthy transaction goes from INITIALIZING to READY- to COMMITTING_TRANSACTION to READY in the log and the thread doesn't exit - example [^healthy_kafka_producer_thread.csv]. I've also queried the kafka's _transaction_state topic for the problematic transaction and [here|https://gist.github.com/sharonx/51e300e383455f016be1a95f0c855b97] are the messages in the topic. I'd appreciate any pointers or potential ways to explain the situation. > kafka sink under EO sometimes is unable to recover from a checkpoint > -------------------------------------------------------------------- > > Key: FLINK-32196 > URL: https://issues.apache.org/jira/browse/FLINK-32196 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.6.4, 1.15.4 > Reporter: Sharon Xie > Priority: Major > Attachments: healthy_kafka_producer_thread.csv, > kafka_producer_network_thread_log.csv, kafka_sink_oom_logs.csv > > > We are seeing an issue where a Flink job using kafka sink under EO is unable > to recover from a checkpoint. The sink task stuck at `INITIALIZING` state and > eventually runs OOM. The cause for OOM is that there is a kafka producer > thread leak. > Here is our best *hypothesis* for the issue. > In `KafkaWriter` under the EO semantic, it intends to abort lingering > transactions upon recovery > [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L175-L179] > However, the actual implementation to abort those transactions in the > `TransactionAborter` doesn't abort those transactions > [https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java#L97-L124] > Specifically `producer.abortTransaction()` is never called in that function. > Instead it calls `producer.flush()`. > Also The function is in for loop that only breaks when `producer.getEpoch() > == 0` which is why we are seeing a producer thread leak as the recovery gets > stuck in this for loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)