Julius created KAFKA-5760:
-----------------------------

             Summary: The kafka Consumer can not poll the message by 
multi-thread
                 Key: KAFKA-5760
                 URL: https://issues.apache.org/jira/browse/KAFKA-5760
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.11.0.0
            Reporter: Julius


When I poll the message, I set the timeout, but it has been blocked in the poll 
method,My kafka Cluster is normal.
Through debugg found, has been doing ConsumerNetworkClient.poll method:
    /**
     * Block indefinitely until the given request future has finished.
     * @param future The request future to await.
     * @throws WakeupException if {@link #wakeup()} is called from another 
thread
     * @throws InterruptException if the calling thread is interrupted
     */
    public void poll(RequestFuture<?> future) {
        while (!future.isDone())
            poll(MAX_POLL_TIMEOUT_MS, time.milliseconds(), future);
    }
I don't know reason, trouble you to confirm, thank you。

Java Code:
        public static Properties getProperties(String groupId, String 
brokerList) {
                Properties properties = new Properties();
                properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
                properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
                properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
OffsetResetStrategy.EARLIEST.name().toLowerCase());
                properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
org.apache.kafka.common.serialization.StringDeserializer.class.getName());
                properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
String.valueOf(false));
                return properties;
        }

public static void main(String[] args) throws InterruptedException, 
JMSException {
                List<KafkaConsumer<String, String>> kafkaConsumers = new 
ArrayList<>();
                for(int i = 0 ; i < 10 ; i++){
                        Properties props = 
getProperties("34620a79-f68b-4129-88f1-0e96bb14194e", "*.*.*.*:9092");
                        KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(props);
                        consumer.subscribe(Collections.singletonList("test"));
                        kafkaConsumers.add(consumer);
                }
                ExecutorService executorService = 
Executors.newFixedThreadPool(3);
                for (int i = 0; i < 30; i++) {
                        final KafkaConsumer<String, String> consumer = 
kafkaConsumers.get(i % kafkaConsumers.size());
                        executorService.execute(new Runnable() {
                                @Override
                                public void run() {
                                        
System.out.println(Thread.currentThread().getName() + "  start get message , 
consumer: " + consumer.toString());
                                        
System.out.println(consumer.assignment());
                                        ConsumerRecords<String, String> records 
= consumer.poll(200);
                                        
System.out.println(Thread.currentThread().getName() + "  poll end, consumer: " 
+ consumer.toString());
                                        for (ConsumerRecord<String, String> 
record : records) {
                                                System.out.printf("offset = %d, 
pattition = %d, key = %s, value = %s", record.offset(),record.partition(), 
record.key(), record.value());
                                                
System.out.println(Thread.currentThread().getName());
                                                consumer.commitSync();
                                                try {
                                                        Thread.sleep(2000);
                                                }
                                                catch (InterruptedException e) {
                                                        e.printStackTrace();
                                                }
                                        }
                                }
                        });
                }
        }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to