[ 
https://issues.apache.org/jira/browse/FLINK-17327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17128272#comment-17128272
 ] 

Aljoscha Krettek edited comment on FLINK-17327 at 6/8/20, 1:51 PM:
-------------------------------------------------------------------

I managed to find a fix for this:

 - change our code to always use {{close()}} with a timeout on the Kafka 
Producer, if not, we might leave lingering threads
 - this alone does not work because of KAFKA-7763, i.e. on shutdown requests 
are not properly cancelled, which leaves lingering threads
 - the fix KAFKA-7763 also introduces code that aborts outstanding transactions 
when cancelling. This doesn't work together with our exactly-once Kafka Producer
 - you need a patched Kafka that includes the fix part of KAFKA-7763, without 
the code that aborts transactions, I'm attaching a patch for that against the 
Kafka 2.4 branch

The changes needed in Flink are here: 
https://github.com/aljoscha/flink/tree/flink-17327-kafka-clean-shutdown-2.4. 
Patch for Kafka is attached. I don't think the Kafka project will like that 
patch, though, because aborting outstanding transactions is valid for Kafka 
Streams/KSQL where pending transactions that are not cancelled with block 
downstream consumption. 


was (Author: aljoscha):
I managed to find a fix for this:

 - change our code to always use {{close()}} with a timeout on the Kafka 
Producer, if not, we might leave lingering threads
 - this alone does not work because of KAFKA-7763, i.e. on shutdown requests 
are not properly cancelled, which leaves lingering threads
 - the fix KAFKA-7763 also introduces code that aborts outstanding transactions 
when cancelling. This doesn't work together with out exactly-once Kafka Producer
 - you need a patched Kafka that includes the fix part of KAFKA-7763, without 
the code that aborts transactions, I'm attaching a patch for that against the 
Kafka 2.4 branch

The changes needed in Flink are here: 
https://github.com/aljoscha/flink/tree/flink-17327-kafka-clean-shutdown-2.4. 
Patch for Kafka is attached. I don't think the Kafka project will like that 
patch, though, because aborting outstanding transactions is valid for Kafka 
Streams/KSQL where pending transactions that are not cancelled with block 
downstream consumption. 

> Kafka unavailability could cause Flink TM shutdown
> --------------------------------------------------
>
>                 Key: FLINK-17327
>                 URL: https://issues.apache.org/jira/browse/FLINK-17327
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: Jun Qin
>            Priority: Critical
>             Fix For: 1.12.0
>
>         Attachments: 0001-Change-version-to-2.4.2-ALJOSCHA.patch, 
> 0002-Don-t-abort-in-flight-transactions.patch, Standalonesession.log, TM.log, 
> TM_produer_only_task.log
>
>
> Steps to reproduce:
>  # Start a Flink 1.10 standalone cluster
>  # Run a Flink job which reads from one Kafka topic and writes to another 
> topic, with exactly-once checkpointing enabled
>  # Stop all Kafka Brokers after a few successful checkpoints
> When Kafka brokers are down:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # Then, Flink could not complete snapshot due to {{Timeout expired while 
> initializing transactional state in 60000ms}}
>  # After several snapshot failures, Flink reported {{Too many ongoing 
> snapshots. Increase kafka producers pool size or decrease number of 
> concurrent checkpoints.}}
>  # Eventually, Flink tried to cancel the task which did not succeed within 3 
> min. According to logs, consumer was cancelled, but producer is still running
>  # Then {{Fatal error occurred while executing the TaskManager. Shutting it 
> down...}}
> I will attach the logs to show the details.  Worth to note that if there 
> would be no consumer but producer only in the task, the behavior is different:
>  # {{org.apache.kafka.clients.NetworkClient}} reported connection to broker 
> could not be established
>  # after {{delivery.timeout.ms}} (2min by default), producer reports: 
> {{FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for 
> output-topic-0:120001 ms has passed since batch creation}}
>  # Flink tried to cancel the upstream tasks and created a new producer
>  # The new producer obviously reported connectivity issue to brokers
>  # This continues till Kafka brokers are back. 
>  # Flink reported {{Too many ongoing snapshots. Increase kafka producers pool 
> size or decrease number of concurrent checkpoints.}}
>  # Flink cancelled the tasks and restarted them
>  # The job continues, and new checkpoint succeeded. 
>  # TM runs all the time in this scenario
> I set Kafka transaction time out to 1 hour just to avoid transaction timeout 
> during the test.
> To get a producer only task, I called {{env.disableOperatorChaining();}} in 
> the second scenario. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to