[ https://issues.apache.org/jira/browse/KAFKA-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
aarti gupta updated KAFKA-1632: ------------------------------- Description: \The following error is thrown, (when I call KafkaStream.head(), as shown in the code snippet below) WARN - java.lang.NoSuchMethodError: kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata; My use case, is that I want to block on the receive() method, and when a message is published on the topic, I 'head' of the queue to the calling method, that processes it. I do not use partitioning and have a single stream. import com.google.common.collect.Maps; import x.x.x.Task; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.consumer.ZookeeperConsumerConnector; import kafka.message.MessageAndMetadata; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.Executors; /** * @author agupta */ public class KafkaConsumerDelegate implements ConsumerDelegate { private ConsumerConnector consumerConnector; private String topicName; private static Logger LOG = LoggerFactory.getLogger(KafkaProducerDelegate.class.getName()); private final Map<String, Integer> topicCount = Maps.newHashMap(); private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams; private List<KafkaStream<byte[], byte[]>> kafkaStreams; @Override public Task receive(final boolean consumerConfirms) { try { LOG.info("Kafka consumer delegate listening on topic " + getTopicName()); kafkaStreams = messageStreams.get(getTopicName()); final KafkaStream<byte[], byte[]> kafkaStream = kafkaStreams.get(0); return Executors.newSingleThreadExecutor().submit(new Callable<Task>() { @Override public Task call() throws Exception { final MessageAndMetadata<byte[], byte[]> messageAndMetadata= kafkaStream.head(); final Task message = new Task() { @Override public byte[] getBytes() { return messageAndMetadata.message(); } }; return message; } }).get(); } catch (Exception e) { LOG.warn("Error in consumer " + e.getMessage()); } return null; } @Override public void initialize(JSONObject configData, boolean publisherAckMode) throws IOException { try { this.topicName = configData.getString("topicName"); LOG.info("Topic name is " + topicName); } catch (JSONException e) { e.printStackTrace(); LOG.error("Error parsing configuration", e); } Properties properties = new Properties(); properties.put("zookeeper.connect", "localhost:2181"); properties.put("group.id", "testgroup"); ConsumerConfig consumerConfig = new ConsumerConfig(properties); //only one stream, and one topic, (Since we are not supporting partitioning) topicCount.put(getTopicName(), 1); consumerConnector = new ZookeeperConsumerConnector(consumerConfig); messageStreams = consumerConnector.createMessageStreams(topicCount); } @Override public void stop() throws IOException { //TODO throw new UnsupportedOperationException("Method Not Implemented"); } public String getTopicName() { return this.topicName; } } Lastly, I am using the following binary kafka_2.8.0-0.8.1.1 and the following maven dependency <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> </dependency> Any suggestions? Thanks aarti was: \The following error is thrown, (when I call KafkaStream.head(), as shown in the code snippet below) WARN - java.lang.NoSuchMethodError: kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata; My use case, is that I want to block on the receive() method, and when anything is published on the topic, I 'head' of the queue to the calling method, that processes it. I do not use partitioning and have a single stream. import com.google.common.collect.Maps; import x.x.x.Task; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.consumer.ZookeeperConsumerConnector; import kafka.message.MessageAndMetadata; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.Executors; /** * @author agupta */ public class KafkaConsumerDelegate implements ConsumerDelegate { private ConsumerConnector consumerConnector; private String topicName; private static Logger LOG = LoggerFactory.getLogger(KafkaProducerDelegate.class.getName()); private final Map<String, Integer> topicCount = Maps.newHashMap(); private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams; private List<KafkaStream<byte[], byte[]>> kafkaStreams; @Override public Task receive(final boolean consumerConfirms) { try { LOG.info("Kafka consumer delegate listening on topic " + getTopicName()); kafkaStreams = messageStreams.get(getTopicName()); final KafkaStream<byte[], byte[]> kafkaStream = kafkaStreams.get(0); return Executors.newSingleThreadExecutor().submit(new Callable<Task>() { @Override public Task call() throws Exception { final MessageAndMetadata<byte[], byte[]> messageAndMetadata= kafkaStream.head(); final Task message = new Task() { @Override public byte[] getBytes() { return messageAndMetadata.message(); } }; return message; } }).get(); } catch (Exception e) { LOG.warn("Error in consumer " + e.getMessage()); } return null; } @Override public void initialize(JSONObject configData, boolean publisherAckMode) throws IOException { try { this.topicName = configData.getString("topicName"); LOG.info("Topic name is " + topicName); } catch (JSONException e) { e.printStackTrace(); LOG.error("Error parsing configuration", e); } Properties properties = new Properties(); properties.put("zookeeper.connect", "localhost:2181"); properties.put("group.id", "testgroup"); ConsumerConfig consumerConfig = new ConsumerConfig(properties); //only one stream, and one topic, (Since we are not supporting partitioning) topicCount.put(getTopicName(), 1); consumerConnector = new ZookeeperConsumerConnector(consumerConfig); messageStreams = consumerConnector.createMessageStreams(topicCount); } @Override public void stop() throws IOException { //TODO throw new UnsupportedOperationException("Method Not Implemented"); } public String getTopicName() { return this.topicName; } } Lastly, I am using the following binary kafka_2.8.0-0.8.1.1 and the following maven dependency <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> </dependency> Any suggestions? Thanks aarti > No such method error on KafkaStream.head > ---------------------------------------- > > Key: KAFKA-1632 > URL: https://issues.apache.org/jira/browse/KAFKA-1632 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8.0 > Reporter: aarti gupta > > \The following error is thrown, (when I call KafkaStream.head(), as shown in > the code snippet below) > WARN - java.lang.NoSuchMethodError: > kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata; > My use case, is that I want to block on the receive() method, and when a > message is published on the topic, I 'head' of the queue to the calling > method, that processes it. > I do not use partitioning and have a single stream. > import com.google.common.collect.Maps; > import x.x.x.Task; > import kafka.consumer.ConsumerConfig; > import kafka.consumer.KafkaStream; > import kafka.javaapi.consumer.ConsumerConnector; > import kafka.javaapi.consumer.ZookeeperConsumerConnector; > import kafka.message.MessageAndMetadata; > import org.codehaus.jettison.json.JSONException; > import org.codehaus.jettison.json.JSONObject; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import java.io.IOException; > import java.util.List; > import java.util.Map; > import java.util.Properties; > import java.util.concurrent.Callable; > import java.util.concurrent.Executors; > /** > * @author agupta > */ > public class KafkaConsumerDelegate implements ConsumerDelegate { > private ConsumerConnector consumerConnector; > private String topicName; > private static Logger LOG = > LoggerFactory.getLogger(KafkaProducerDelegate.class.getName()); > private final Map<String, Integer> topicCount = Maps.newHashMap(); > private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams; > private List<KafkaStream<byte[], byte[]>> kafkaStreams; > @Override > public Task receive(final boolean consumerConfirms) { > try { > LOG.info("Kafka consumer delegate listening on topic " + > getTopicName()); > kafkaStreams = messageStreams.get(getTopicName()); > final KafkaStream<byte[], byte[]> kafkaStream = > kafkaStreams.get(0); > return Executors.newSingleThreadExecutor().submit(new > Callable<Task>() { > @Override > public Task call() throws Exception { > final MessageAndMetadata<byte[], byte[]> > messageAndMetadata= kafkaStream.head(); > final Task message = new Task() { > @Override > public byte[] getBytes() { > return messageAndMetadata.message(); > } > }; > return message; > } > }).get(); > } catch (Exception e) { > LOG.warn("Error in consumer " + e.getMessage()); > } > return null; > } > @Override > public void initialize(JSONObject configData, boolean publisherAckMode) > throws IOException { > try { > this.topicName = configData.getString("topicName"); > LOG.info("Topic name is " + topicName); > } catch (JSONException e) { > e.printStackTrace(); > LOG.error("Error parsing configuration", e); > } > Properties properties = new Properties(); > properties.put("zookeeper.connect", "localhost:2181"); > properties.put("group.id", "testgroup"); > ConsumerConfig consumerConfig = new ConsumerConfig(properties); > //only one stream, and one topic, (Since we are not supporting > partitioning) > topicCount.put(getTopicName(), 1); > consumerConnector = new ZookeeperConsumerConnector(consumerConfig); > messageStreams = consumerConnector.createMessageStreams(topicCount); > } > @Override > public void stop() throws IOException { > //TODO > throw new UnsupportedOperationException("Method Not Implemented"); > } > public String getTopicName() { > return this.topicName; > } > } > Lastly, I am using the following binary > kafka_2.8.0-0.8.1.1 > and the following maven dependency > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.10</artifactId> > <version>0.8.1.1</version> > </dependency> > Any suggestions? > Thanks > aarti -- This message was sent by Atlassian JIRA (v6.3.4#6332)