Hi, I am trying to send messages (synchronous) to a kafka cluster (lets call it A). I get 'Batch Expired Exception' very frequently. Also the average time taken per send is very high around 5 seconds.
However for the same code when I send messages to a different kafka cluster B (with same network latency), average time per send is 15ms which is acceptable. I have checked the kafka server logs in cluster A and nothing looks abnormal. ProducerPerformance tool is returning below good results for cluster A which makes it confusing. kafka-run-class org.apache.kafka.tools.ProducerPerformance --topic testREPL --num-records 50000 --record-size 1000 --throughput 100 --producer-props acks=1 bootstrap.servers=<broker list> batch.size=8196 output: 50000 records sent, 100.000400 records/sec (0.10 MB/sec), 0.52 ms avg latency, 179.00 ms max latency, 1 ms 50th, 1 ms 95th, 1 ms 99th, 3 ms 99.9th. My producer properties look like this props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 0); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); producer send looks like this ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, partition, Integer.toString(partition), payload); producer.send(producerRecord).get(); I have spent very long time trying to debug but no luck - I appreciate any help to help fix this. Below is sample stack trace when I am publishing to cluster A java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch Expired org.apache.kafka.common.errors.TimeoutException: Batch Expired org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) 10.0.128.115 org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) 10.0.128.115 org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) Thanks, Sreeram