Hi
We’ve got a job producing to a Kafka sink. The
Kafka topics have a retention of 2 weeks. When doing a
complete replay, it seems like Flink isn’t able to
back-pressure or throttle the amount of messages going
to Kafka, causing the following error:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
Failed to send data to Kafka: Expiring 8396 record(s)
for topic-1:120000 ms has passed since batch creation
We’re running on Flink 1.7.2 with
flink-connector-kafka:1.7.2. Our Kafka cluster is
running version 2.1.1. The Kafka producer uses all
default settings except from:
compression.type = snappy
max.in.flight.requests.per.connection = 1
acks = all
client.dns.lookup = use_all_dns_ips
I tried playing around with the buffer and batch
settings, increasing timeouts, but none seem to be what
we need. Increasing the
delivery.timeout.ms and
request.timeout.ms solves
the initial error, but causes the Flink job to fail
entirely due to:
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
Caused by: java.lang.RuntimeException: Buffer pool
is destroyed.
My assumption is that the Kafka producer will start
blocking since it notices that it can't handle the
batches, and Flink eventually runs out of buffers for
the operator.
What really baffles me is that the backpressure tab
shows that everything is OK. The entire job pipeline
(which reads from 4 different topics, unions them all
and sinks towards 1 topic) pushes all the messages
through to the sink stage, resulting in 18 million
incoming stage messages, even though Kafka is in no
way possible to keep up with this.
I searched for others facing the same issue but
can't find anything similar. I'm hoping that someone
here could guide me in the right direction.
Thanks in advance