head() is a scala method. Calling it from java requires you to figure out the exact class name in byte code. A simpler way is to use the java iterable api in KafkaStream. By default, it blocks on the hasNext() call when there is no message.
Thanks, Jun On Sat, Sep 13, 2014 at 1:01 AM, Aarti Gupta <aartigup...@gmail.com> wrote: > The following error is thrown, (when I call KafkaStream.head(), as shown in > the code snippet below) > > * WARN - java.lang.NoSuchMethodError: > kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;* > > My use case, is that I want to block on the receive() method, and when > anything is published on the topic, I 'head' of the queue to the calling > method, that processes it. > > I do not use partitioning and have a single stream. > > > import com.google.common.collect.Maps; > import x.x.x.Task; > import kafka.consumer.ConsumerConfig; > import kafka.consumer.KafkaStream; > import kafka.javaapi.consumer.ConsumerConnector; > import kafka.javaapi.consumer.ZookeeperConsumerConnector; > import kafka.message.MessageAndMetadata; > import org.codehaus.jettison.json.JSONException; > import org.codehaus.jettison.json.JSONObject; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import java.io.IOException; > import java.util.List; > import java.util.Map; > import java.util.Properties; > import java.util.concurrent.Callable; > import java.util.concurrent.Executors; > > /** > * @author agupta > */ > public class KafkaConsumerDelegate implements ConsumerDelegate { > > private ConsumerConnector consumerConnector; > private String topicName; > private static Logger LOG = > LoggerFactory.getLogger(KafkaProducerDelegate.class.getName()); > private final Map<String, Integer> topicCount = Maps.newHashMap(); > private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams; > private List<KafkaStream<byte[], byte[]>> kafkaStreams; > > > @Override > public Task receive(final boolean consumerConfirms) { > try { > LOG.info("Kafka consumer delegate listening on topic " + > getTopicName()); > kafkaStreams = messageStreams.get(getTopicName()); > final KafkaStream<byte[], byte[]> kafkaStream = > kafkaStreams.get(0); > return Executors.newSingleThreadExecutor().submit(new > Callable<Task>() { > @Override > public Task call() throws Exception { > * final MessageAndMetadata<byte[], byte[]> > messageAndMetadata= kafkaStream.head();* > > final Task message = new Task() { > @Override > public byte[] getBytes() { > return messageAndMetadata.message(); > > } > }; > return message; > } > }).get(); > > } catch (Exception e) { > LOG.warn("Error in consumer " + e.getMessage()); > } > return null; > } > > @Override > public void initialize(JSONObject configData, boolean publisherAckMode) > throws IOException { > try { > this.topicName = configData.getString("topicName"); > LOG.info("Topic name is " + topicName); > } catch (JSONException e) { > e.printStackTrace(); > LOG.error("Error parsing configuration", e); > } > Properties properties = new Properties(); > properties.put("zookeeper.connect", "localhost:2181"); > properties.put("group.id", "testgroup"); > ConsumerConfig consumerConfig = new ConsumerConfig(properties); > //only one stream, and one topic, (Since we are not supporting > partitioning) > topicCount.put(getTopicName(), 1); > consumerConnector = new ZookeeperConsumerConnector(consumerConfig); > messageStreams = > consumerConnector.createMessageStreams(topicCount); > } > > @Override > public void stop() throws IOException { > //TODO > throw new UnsupportedOperationException("Method Not Implemented"); > } > > public String getTopicName() { > return this.topicName; > } > } > > > > Lastly, I am using the following binary > > kafka_2.8.0-0.8.1.1 > > and the following maven dependency > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.10</artifactId> > <version>0.8.1.1</version> > </dependency> > > > Any suggestions? > > Thanks > aarti >