Alexander Sibiryakov created KAFKA-10335: --------------------------------------------
Summary: Blocking of producer IO thread when calling send() from callback Key: KAFKA-10335 URL: https://issues.apache.org/jira/browse/KAFKA-10335 Project: Kafka Issue Type: Bug Components: clients, producer Reporter: Alexander Sibiryakov We had application which supposed to be using KafkaProducer to deliver results of some work. Sometimes delivery of results weren't successful because of network connectivity errors or maintenance happening on the broker side. In such cases we wanted application to send an event with error and original message details. All good, but we wanted errors to be delivered to a separate topic. So we implemented a callback in send() method, using the same producer instance and calling send() from there. This application worked for some time, but then we encountered that its producer was stuck. Almost no CPU consumption and expiring batches for hours. After connecting with debugger it turned out that sender IO thread is blocking. When record is expired, a callback was called, which contained a call to send(), implying usage of a new topic, which metadata is not present in producer's client cache. When send() is missing metadata, it is allowed to block for up to max.block.ms interval, which is 60 secs by default. If application is active, then it will quickly result in a large amount of accumulated records. Every record will block IO thread for 60s. Therefore sender IO thread is essentially blocked. In Producer only Sender IO thread contains a call to client's poll() method, which is responsible for all the network communication and metadata update. If poll() is executed with significant delay then it will result to errors, connected with various timeouts. That's it we've got a stuck producer with little chance to recover. To summarise, pre-requisites for the problem are sending from callback, using the same producer instance and usage of topic which wasn't seen before. I think it is important to decide if the issue is KafkaProducer misuse or its bug. Code is callbacks shouldn't block, that is clear, but at the same time, no one expects already initialised producer to block. Depending on decision I could produce a fix, it can be either a warning when user is trying to call a send() from callback, or reduction of max allowed blocking time for metadata update. It could be just docs changes, or even nothing. Here is code to reproduce the issue, the output it is producing follows the code snippet. Tested on Confluent Cloud, from my desktop with 100 Mbps connection. {code:java} public static void main(String[] args) throws IOException { byte[] blob = new byte[262144]; Properties properties = new Properties(); properties.load(new FileReader("kafka-staging.properties")); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.setProperty("request.timeout.ms", "5000"); properties.setProperty("delivery.timeout.ms", "5000"); KafkaProducer<String, byte[]> producer = new KafkaProducer(properties); while (true) { ProducerRecord<String, byte[]> record = new ProducerRecord<>("alex-test-valid-data", blob); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println(exception); long start = System.currentTimeMillis(); ProducerRecord<String, byte[]> record = new ProducerRecord<>("alex-test-errors", blob); producer.send(record); // blocking caused by metadata update long timeElapsed = System.currentTimeMillis() - start; System.err.println("time spent blocking IO thread: " + timeElapsed); } } }); } } {code} {noformat} [2020-07-31 14:35:51,936: INFO/main] (AbstractConfig.java:347) - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [pkc-l915e.europe-west1.gcp.confluent.cloud:9092] buffer.memory = 33554432 client.dns.lookup = default client.id = compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 5000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 5000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = PLAIN security.protocol = SASL_SSL security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer [2020-07-31 14:35:52,099: INFO/main] (AbstractLogin.java:61) - Successfully logged in. [2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:117) - Kafka version: 5.4.0-ccs [2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:118) - Kafka commitId: f4201a82bea68cc7 [2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:119) - Kafka startTimeMs: 1596198952287 [2020-07-31 14:35:52,853: INFO/kafka-producer-network-thread | producer-1] (Metadata.java:261) - [Producer clientId=producer-1] Cluster ID: lkc-43m2m org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-0:5001 ms has passed since batch creation org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms. time spent blocking IO thread: 60001 org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. time spent blocking IO thread: 60002 time spent blocking IO thread: 60017 org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. [2020-07-31 14:38:07,219: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-3 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms. time spent blocking IO thread: 60003 [2020-07-31 14:39:07,223: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. time spent blocking IO thread: 60002 time spent blocking IO thread: 60001 [2020-07-31 14:40:07,224: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-5 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms. time spent blocking IO thread: 60001 [2020-07-31 14:41:07,225: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-1 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. time spent blocking IO thread: 60004 time spent blocking IO thread: 60004 [2020-07-31 14:42:07,229: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-4 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms. time spent blocking IO thread: 60000 [2020-07-31 14:43:07,229: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-2 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422600 ms has passed since batch creation time spent blocking IO thread: 60003 time spent blocking IO thread: 60001 org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422490 ms has passed since batch creation org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms. time spent blocking IO thread: 60002 org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422315 ms has passed since batch creation time spent blocking IO thread: 60003 time spent blocking IO thread: 60003 org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422124 ms has passed since batch creation {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)