I am using kafka_2.10 now. My consumer is tuck in infinite loop at highlighted line. Is there any issue with code?
public static void main(String[] args) { Properties props = new Properties(); props.put("zookeeper.connect","zkaddress:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "5000"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); ConsumerConfig consumerConfig = new ConsumerConfig(props); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put("topicname", new Integer(1)); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streamList = streamMap.get("topicname"); System.out.println("fetching...");System.out.println("size"+streamList.size()); for(KafkaStream<byte[], byte[]> stream:streamList){ System.out.println("size str"+stream.size()); ConsumerIterator<byte[], byte[]> strItr = stream.iterator(); System.out.println("size itr"+strItr.size()); while(strItr.hasNext()){ MessageAndMetadata<byte[], byte[]> msgMeta = strItr.next(); System.out.println("offset is"+msgMeta.offset()); System.out.println(msgMeta.message()); } } } And how do we specify batch size while consuming messages? On Mon, Jun 22, 2015 at 12:23 AM, Sriharsha Chintalapani <ka...@harsha.io> wrote: > Sushant, > You are using kafka clients new consumer api. It looks like you want > to use high-level consumer api?. If so you need use following kafka core > lib as the dependency > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.10</artifactId> > <version>0.8.2.1</version> > </dependency > > More details on this page > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example > -- > Harsha > > > On June 21, 2015 at 8:02:01 AM, Shushant Arora (shushantaror...@gmail.com) > wrote: > > which is the latest jar to be used for kafka java client. > > As in > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>0.8.2.1</version> > </dependency> > > In class org.apache.kafka.clients.consumer.KafkaConsumer > > public Map<String, ConsumerRecords<K,V>> poll(long timeout) { > // TODO Auto-generated method stub > return null; > } > > poll method returns null. I want to use a high level java consumer. > > And why in org.apache.kafka.clients.consumer.ConsumerConfig configuration > is for > public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; > not for zookeeper.connect ? Is in highlevel offsets are maintained by > zookeeper > then why broker address is required? > >