[ 
https://issues.apache.org/jira/browse/KAFKA-945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13686841#comment-13686841
 ] 

Jason Rosenberg commented on KAFKA-945:
---------------------------------------

Sam, thanks, that solved it.  So, it appears that a major change happened with 
the default settings of auto.offset.reset with 0.8 (it's largest, whereas the 
old 'autooffset.reset' defaults to smallest).

Seems an odd change.

Jason
                
> Problem with test to send a message and then consume it
> -------------------------------------------------------
>
>                 Key: KAFKA-945
>                 URL: https://issues.apache.org/jira/browse/KAFKA-945
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jason Rosenberg
>         Attachments: kafak-945.out, kafka-945.tar.gz
>
>
> A simple test, which sends on message synchronously, and then consumes it, is 
> failing, with the latest 0.8 beta release candidate (produced from sha: 
> 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4
> Note this problem did not occur with a previous version of the 0.8 branch 
> (e.g. it seems to work fine for sha:  
> 988d4d8e65a14390abd748318a64e281e4a37c19).
> Essentially, it appears that the message never gets sent (after complaining 
> about missing partition leader, etc.).
> To reproduce, run the sample zookeeper and kafka scripts, that come with the 
> distribution (but first delete all pre-existing state by removing the data 
> directories that zookeeper and kafka use:
> rm -rf /tmp/zookeeper
> rm -rf/tmp/kafka_logs
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> ./bin/kafka-server-start.sh config/server.properties
> Then execute the simple test code (I will attach a tarball which you should 
> be able to unpack and run this example).
>   @Test public void produceAndConsumeMessage() throws Exception {
>     // assumes zookeeper and kafka server are running.
>     String zkConnect = "localhost:2181";
>     int port = 9092;
>     Properties pProps = new Properties();
>     pProps.put("metadata.broker.list", "localhost:" + port);
>     pProps.put("serializer.class", "kafka.serializer.StringEncoder");
>     ProducerConfig pConfig = new ProducerConfig(pProps);
>     Producer<Integer, String> producer = new Producer<Integer, 
> String>(pConfig);
>     KeyedMessage<Integer, String> data =
>         new KeyedMessage<Integer, String>("test-topic", "test-message");
>     producer.send(data);
>     producer.close();
>     Properties cProps = new Properties();
>     cProps.put("zookeeper.connect", zkConnect);
>     cProps.put("group.id", "group1");
>     ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
>     ConsumerConnector consumerConnector = 
> Consumer.createJavaConsumerConnector(consumerConfig);
>     Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
>         consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 
> 1));
>     List<KafkaStream<byte[], byte[]>> streams = 
> topicMessageStreams.get("test-topic");
>     final KafkaStream<byte[], byte[]> stream = streams.get(0);
>     final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
>     // run in a separate thread
>     final AtomicBoolean success = new AtomicBoolean(false);
>     Thread consumeThread = new Thread(new Runnable() {
>       public void run() {
>         while (iter.hasNext()) {
>           byte[] msg = iter.next().message();
>           String msgStr = new String(msg);
>           success.set(msgStr.equals("test-message"));
>           break;
>         }
>       }
>     });
>     consumeThread.start();
>     consumeThread.join();
>     consumerConnector.shutdown();
>     assertTrue(success.get());
>   }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to