Julius created KAFKA-5760:
-----------------------------
Summary: The kafka Consumer can not poll the message by
multi-thread
Key: KAFKA-5760
URL: https://issues.apache.org/jira/browse/KAFKA-5760
Project: Kafka
Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: Julius
When I poll the message, I set the timeout, but it has been blocked in the poll
method,My kafka Cluster is normal.
Through debugg found, has been doing ConsumerNetworkClient.poll method:
/**
* Block indefinitely until the given request future has finished.
* @param future The request future to await.
* @throws WakeupException if {@link #wakeup()} is called from another
thread
* @throws InterruptException if the calling thread is interrupted
*/
public void poll(RequestFuture<?> future) {
while (!future.isDone())
poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future);
}
I don't know reason, trouble you to confirm, thank you。
Java Code:
public static Properties getProperties(String groupId, String
brokerList) {
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerList);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.name().toLowerCase());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(false));
return properties;
}
public static void main(String[] args) throws InterruptedException,
JMSException {
List<KafkaConsumer<String, String>> kafkaConsumers = new
ArrayList<>();
for(int i = 0 ; i < 10 ; i++){
Properties props =
getProperties("34620a79-f68b-4129-88f1-0e96bb14194e", "*.*.*.*:9092");
KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
kafkaConsumers.add(consumer);
}
ExecutorService executorService =
Executors.newFixedThreadPool(3);
for (int i = 0; i < 30; i++) {
final KafkaConsumer<String, String> consumer =
kafkaConsumers.get(i % kafkaConsumers.size());
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " start get message ,
consumer: " + consumer.toString());
System.out.println(consumer.assignment());
ConsumerRecords<String, String> records
= consumer.poll(200);
System.out.println(Thread.currentThread().getName() + " poll end, consumer: "
+ consumer.toString());
for (ConsumerRecord<String, String>
record : records) {
System.out.printf("offset = %d,
pattition = %d, key = %s, value = %s", record.offset(),record.partition(),
record.key(), record.value());
System.out.println(Thread.currentThread().getName());
consumer.commitSync();
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)