Hi Marc, the Kafka Producer should be able to create backpressure. Could you try to increase max.block.ms to Long.MAX_VALUE?
The exceptions you shared for the failure case don't look like the root causes of the problem. Could you share the full stacktraces or even full logs for this time frame. Feel free to send these logs to me directly, if you don't want to share them on the list. Best, Konstantin On Thu, Mar 28, 2019 at 2:04 PM Marc Rooding <m...@webresource.nl> wrote: > Hi > > We’ve got a job producing to a Kafka sink. The Kafka topics have a > retention of 2 weeks. When doing a complete replay, it seems like Flink > isn’t able to back-pressure or throttle the amount of messages going to > Kafka, causing the following error: > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to > send data to Kafka: Expiring 8396 record(s) for topic-1:120000 ms has > passed since batch creation > > We’re running on Flink 1.7.2 with flink-connector-kafka:1.7.2. Our Kafka > cluster is running version 2.1.1. The Kafka producer uses all default > settings except from: > > compression.type = snappy > max.in.flight.requests.per.connection = 1 > acks = all > client.dns.lookup = use_all_dns_ips > > I tried playing around with the buffer and batch settings, increasing > timeouts, but none seem to be what we need. Increasing the > delivery.timeout.ms and request.timeout.ms solves the initial error, but > causes the Flink job to fail entirely due to: > > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > Caused by: java.lang.RuntimeException: Buffer pool is destroyed. > > My assumption is that the Kafka producer will start blocking since it > notices that it can't handle the batches, and Flink eventually runs out of > buffers for the operator. > > What really baffles me is that the backpressure tab shows that everything > is OK. The entire job pipeline (which reads from 4 different topics, unions > them all and sinks towards 1 topic) pushes all the messages through to the > sink stage, resulting in 18 million incoming stage messages, even though > Kafka is in no way possible to keep up with this. > > I searched for others facing the same issue but can't find anything > similar. I'm hoping that someone here could guide me in the right direction. > > Thanks in advance > > -- Konstantin Knauf | Solutions Architect +49 160 91394525 <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen