[ https://issues.apache.org/jira/browse/KAFKA-945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13686841#comment-13686841 ]
Jason Rosenberg commented on KAFKA-945: --------------------------------------- Sam, thanks, that solved it. So, it appears that a major change happened with the default settings of auto.offset.reset with 0.8 (it's largest, whereas the old 'autooffset.reset' defaults to smallest). Seems an odd change. Jason > Problem with test to send a message and then consume it > ------------------------------------------------------- > > Key: KAFKA-945 > URL: https://issues.apache.org/jira/browse/KAFKA-945 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Jason Rosenberg > Attachments: kafak-945.out, kafka-945.tar.gz > > > A simple test, which sends on message synchronously, and then consumes it, is > failing, with the latest 0.8 beta release candidate (produced from sha: > 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4 > Note this problem did not occur with a previous version of the 0.8 branch > (e.g. it seems to work fine for sha: > 988d4d8e65a14390abd748318a64e281e4a37c19). > Essentially, it appears that the message never gets sent (after complaining > about missing partition leader, etc.). > To reproduce, run the sample zookeeper and kafka scripts, that come with the > distribution (but first delete all pre-existing state by removing the data > directories that zookeeper and kafka use: > rm -rf /tmp/zookeeper > rm -rf/tmp/kafka_logs > ./bin/zookeeper-server-start.sh config/zookeeper.properties > ./bin/kafka-server-start.sh config/server.properties > Then execute the simple test code (I will attach a tarball which you should > be able to unpack and run this example). > @Test public void produceAndConsumeMessage() throws Exception { > // assumes zookeeper and kafka server are running. > String zkConnect = "localhost:2181"; > int port = 9092; > Properties pProps = new Properties(); > pProps.put("metadata.broker.list", "localhost:" + port); > pProps.put("serializer.class", "kafka.serializer.StringEncoder"); > ProducerConfig pConfig = new ProducerConfig(pProps); > Producer<Integer, String> producer = new Producer<Integer, > String>(pConfig); > KeyedMessage<Integer, String> data = > new KeyedMessage<Integer, String>("test-topic", "test-message"); > producer.send(data); > producer.close(); > Properties cProps = new Properties(); > cProps.put("zookeeper.connect", zkConnect); > cProps.put("group.id", "group1"); > ConsumerConfig consumerConfig = new ConsumerConfig(cProps); > ConsumerConnector consumerConnector = > Consumer.createJavaConsumerConnector(consumerConfig); > Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = > consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", > 1)); > List<KafkaStream<byte[], byte[]>> streams = > topicMessageStreams.get("test-topic"); > final KafkaStream<byte[], byte[]> stream = streams.get(0); > final ConsumerIterator<byte[], byte[]> iter = stream.iterator(); > // run in a separate thread > final AtomicBoolean success = new AtomicBoolean(false); > Thread consumeThread = new Thread(new Runnable() { > public void run() { > while (iter.hasNext()) { > byte[] msg = iter.next().message(); > String msgStr = new String(msg); > success.set(msgStr.equals("test-message")); > break; > } > } > }); > consumeThread.start(); > consumeThread.join(); > consumerConnector.shutdown(); > assertTrue(success.get()); > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira