[ https://issues.apache.org/jira/browse/KAFKA-945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Rosenberg updated KAFKA-945: ---------------------------------- Attachment: kafak-945.out Here's the output when I run this test on my machine, in case there might be some weird interaction with my machine (a mac laptop). One thing that seems weird in this output, is that it seems like a second producer is started up after the consumer has started up, which makes no sense looking at the code (it should have been a sync producer). > Problem with test to send a message and then consume it > ------------------------------------------------------- > > Key: KAFKA-945 > URL: https://issues.apache.org/jira/browse/KAFKA-945 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Jason Rosenberg > Attachments: kafak-945.out, kafka-945.tar.gz > > > A simple test, which sends on message synchronously, and then consumes it, is > failing, with the latest 0.8 beta release candidate (produced from sha: > 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4 > Note this problem did not occur with a previous version of the 0.8 branch > (e.g. it seems to work fine for sha: > 988d4d8e65a14390abd748318a64e281e4a37c19). > Essentially, it appears that the message never gets sent (after complaining > about missing partition leader, etc.). > To reproduce, run the sample zookeeper and kafka scripts, that come with the > distribution (but first delete all pre-existing state by removing the data > directories that zookeeper and kafka use: > rm -rf /tmp/zookeeper > rm -rf/tmp/kafka_logs > ./bin/zookeeper-server-start.sh config/zookeeper.properties > ./bin/kafka-server-start.sh config/server.properties > Then execute the simple test code (I will attach a tarball which you should > be able to unpack and run this example). > @Test public void produceAndConsumeMessage() throws Exception { > // assumes zookeeper and kafka server are running. > String zkConnect = "localhost:2181"; > int port = 9092; > Properties pProps = new Properties(); > pProps.put("metadata.broker.list", "localhost:" + port); > pProps.put("serializer.class", "kafka.serializer.StringEncoder"); > ProducerConfig pConfig = new ProducerConfig(pProps); > Producer<Integer, String> producer = new Producer<Integer, > String>(pConfig); > KeyedMessage<Integer, String> data = > new KeyedMessage<Integer, String>("test-topic", "test-message"); > producer.send(data); > producer.close(); > Properties cProps = new Properties(); > cProps.put("zookeeper.connect", zkConnect); > cProps.put("group.id", "group1"); > ConsumerConfig consumerConfig = new ConsumerConfig(cProps); > ConsumerConnector consumerConnector = > Consumer.createJavaConsumerConnector(consumerConfig); > Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = > consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", > 1)); > List<KafkaStream<byte[], byte[]>> streams = > topicMessageStreams.get("test-topic"); > final KafkaStream<byte[], byte[]> stream = streams.get(0); > final ConsumerIterator<byte[], byte[]> iter = stream.iterator(); > // run in a separate thread > final AtomicBoolean success = new AtomicBoolean(false); > Thread consumeThread = new Thread(new Runnable() { > public void run() { > while (iter.hasNext()) { > byte[] msg = iter.next().message(); > String msgStr = new String(msg); > success.set(msgStr.equals("test-message")); > break; > } > } > }); > consumeThread.start(); > consumeThread.join(); > consumerConnector.shutdown(); > assertTrue(success.get()); > } -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira