Julius created KAFKA-5760: ----------------------------- Summary: The kafka Consumer can not poll the message by multi-thread Key: KAFKA-5760 URL: https://issues.apache.org/jira/browse/KAFKA-5760 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.0 Reporter: Julius
When I poll the message, I set the timeout, but it has been blocked in the poll method,My kafka Cluster is normal. Through debugg found, has been doing ConsumerNetworkClient.poll method: /** * Block indefinitely until the given request future has finished. * @param future The request future to await. * @throws WakeupException if {@link #wakeup()} is called from another thread * @throws InterruptException if the calling thread is interrupted */ public void poll(RequestFuture<?> future) { while (!future.isDone()) poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future); } I don't know reason, trouble you to confirm, thank you。 Java Code: public static Properties getProperties(String groupId, String brokerList) { Properties properties = new Properties(); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName()); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(false)); return properties; } public static void main(String[] args) throws InterruptedException, JMSException { List<KafkaConsumer<String, String>> kafkaConsumers = new ArrayList<>(); for(int i = 0 ; i < 10 ; i++){ Properties props = getProperties("34620a79-f68b-4129-88f1-0e96bb14194e", "*.*.*.*:9092"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test")); kafkaConsumers.add(consumer); } ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 30; i++) { final KafkaConsumer<String, String> consumer = kafkaConsumers.get(i % kafkaConsumers.size()); executorService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " start get message , consumer: " + consumer.toString()); System.out.println(consumer.assignment()); ConsumerRecords<String, String> records = consumer.poll(200); System.out.println(Thread.currentThread().getName() + " poll end, consumer: " + consumer.toString()); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, pattition = %d, key = %s, value = %s", record.offset(),record.partition(), record.key(), record.value()); System.out.println(Thread.currentThread().getName()); consumer.commitSync(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } } -- This message was sent by Atlassian JIRA (v6.4.14#64029)