[ https://issues.apache.org/jira/browse/KAFKA-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
aarti gupta updated KAFKA-1632: ------------------------------- Description: \The following error is thrown, (when I call KafkaStream.head(), as shown in the code snippet below) WARN - java.lang.NoSuchMethodError: kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata; My use case, is that I want to block on the receive() method, and when anything is published on the topic, I 'head' of the queue to the calling method, that processes it. I do not use partitioning and have a single stream. import com.google.common.collect.Maps; import x.x.x.Task; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.consumer.ZookeeperConsumerConnector; import kafka.message.MessageAndMetadata; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.Executors; /** * @author agupta */ public class KafkaConsumerDelegate implements ConsumerDelegate { private ConsumerConnector consumerConnector; private String topicName; private static Logger LOG = LoggerFactory.getLogger(KafkaProducerDelegate.class.getName()); private final Map<String, Integer> topicCount = Maps.newHashMap(); private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams; private List<KafkaStream<byte[], byte[]>> kafkaStreams; @Override public Task receive(final boolean consumerConfirms) { try { LOG.info("Kafka consumer delegate listening on topic " + getTopicName()); kafkaStreams = messageStreams.get(getTopicName()); final KafkaStream<byte[], byte[]> kafkaStream = kafkaStreams.get(0); return Executors.newSingleThreadExecutor().submit(new Callable<Task>() { @Override public Task call() throws Exception { final MessageAndMetadata<byte[], byte[]> messageAndMetadata= kafkaStream.head(); final Task message = new Task() { @Override public byte[] getBytes() { return messageAndMetadata.message(); } }; return message; } }).get(); } catch (Exception e) { LOG.warn("Error in consumer " + e.getMessage()); } return null; } @Override public void initialize(JSONObject configData, boolean publisherAckMode) throws IOException { try { this.topicName = configData.getString("topicName"); LOG.info("Topic name is " + topicName); } catch (JSONException e) { e.printStackTrace(); LOG.error("Error parsing configuration", e); } Properties properties = new Properties(); properties.put("zookeeper.connect", "localhost:2181"); properties.put("group.id", "testgroup"); ConsumerConfig consumerConfig = new ConsumerConfig(properties); //only one stream, and one topic, (Since we are not supporting partitioning) topicCount.put(getTopicName(), 1); consumerConnector = new ZookeeperConsumerConnector(consumerConfig); messageStreams = consumerConnector.createMessageStreams(topicCount); } @Override public void stop() throws IOException { //TODO throw new UnsupportedOperationException("Method Not Implemented"); } public String getTopicName() { return this.topicName; } } Lastly, I am using the following binary kafka_2.8.0-0.8.1.1 and the following maven dependency <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> </dependency> Any suggestions? Thanks aarti was: Hi Kafka team, We use Kafka to send messages in an high volume/memory crazy application which uses Parallel GC. We send messages at the rate of 12500/min in the first few hours and then the number of messages drop down to 6000/min. Our application usually runs for a maximum of 24 hours What we have: 1) When we do not send messages through Kafka Producer 0.8, then our application never slows down much and our entire process completes within 24 hours 2) When we use Kafka, our machines slow down in sending messages to around 2500/min and as time progresses, the number of messages being sent is even lesser 3) We suspect that our application spends more time in GC and hence the problem. The Heap Dump does not contain an leak suspect with Kafka, but this slowness happens only when Kafka messaging system is used. Any pointers that could help us resolve this issue will be highly appreciated. > No such method error on KafkaStream.head > ---------------------------------------- > > Key: KAFKA-1632 > URL: https://issues.apache.org/jira/browse/KAFKA-1632 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8.0 > Reporter: aarti gupta > Priority: Critical > > \The following error is thrown, (when I call KafkaStream.head(), as shown in > the code snippet below) > WARN - java.lang.NoSuchMethodError: > kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata; > My use case, is that I want to block on the receive() method, and when > anything is published on the topic, I 'head' of the queue to the calling > method, that processes it. > I do not use partitioning and have a single stream. > import com.google.common.collect.Maps; > import x.x.x.Task; > import kafka.consumer.ConsumerConfig; > import kafka.consumer.KafkaStream; > import kafka.javaapi.consumer.ConsumerConnector; > import kafka.javaapi.consumer.ZookeeperConsumerConnector; > import kafka.message.MessageAndMetadata; > import org.codehaus.jettison.json.JSONException; > import org.codehaus.jettison.json.JSONObject; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import java.io.IOException; > import java.util.List; > import java.util.Map; > import java.util.Properties; > import java.util.concurrent.Callable; > import java.util.concurrent.Executors; > /** > * @author agupta > */ > public class KafkaConsumerDelegate implements ConsumerDelegate { > private ConsumerConnector consumerConnector; > private String topicName; > private static Logger LOG = > LoggerFactory.getLogger(KafkaProducerDelegate.class.getName()); > private final Map<String, Integer> topicCount = Maps.newHashMap(); > private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams; > private List<KafkaStream<byte[], byte[]>> kafkaStreams; > @Override > public Task receive(final boolean consumerConfirms) { > try { > LOG.info("Kafka consumer delegate listening on topic " + > getTopicName()); > kafkaStreams = messageStreams.get(getTopicName()); > final KafkaStream<byte[], byte[]> kafkaStream = > kafkaStreams.get(0); > return Executors.newSingleThreadExecutor().submit(new > Callable<Task>() { > @Override > public Task call() throws Exception { > final MessageAndMetadata<byte[], byte[]> > messageAndMetadata= kafkaStream.head(); > final Task message = new Task() { > @Override > public byte[] getBytes() { > return messageAndMetadata.message(); > } > }; > return message; > } > }).get(); > } catch (Exception e) { > LOG.warn("Error in consumer " + e.getMessage()); > } > return null; > } > @Override > public void initialize(JSONObject configData, boolean publisherAckMode) > throws IOException { > try { > this.topicName = configData.getString("topicName"); > LOG.info("Topic name is " + topicName); > } catch (JSONException e) { > e.printStackTrace(); > LOG.error("Error parsing configuration", e); > } > Properties properties = new Properties(); > properties.put("zookeeper.connect", "localhost:2181"); > properties.put("group.id", "testgroup"); > ConsumerConfig consumerConfig = new ConsumerConfig(properties); > //only one stream, and one topic, (Since we are not supporting > partitioning) > topicCount.put(getTopicName(), 1); > consumerConnector = new ZookeeperConsumerConnector(consumerConfig); > messageStreams = consumerConnector.createMessageStreams(topicCount); > } > @Override > public void stop() throws IOException { > //TODO > throw new UnsupportedOperationException("Method Not Implemented"); > } > public String getTopicName() { > return this.topicName; > } > } > Lastly, I am using the following binary > kafka_2.8.0-0.8.1.1 > and the following maven dependency > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.10</artifactId> > <version>0.8.1.1</version> > </dependency> > Any suggestions? > Thanks > aarti -- This message was sent by Atlassian JIRA (v6.3.4#6332)