Hi Li Li,

If you use the same consumer group id then offsets may have already been
committed to Kafka, hence messages before that will not be consumed.

Guozhang


On Mon, Jun 23, 2014 at 6:09 PM, Li Li <fancye...@gmail.com> wrote:

> no luck by adding props.put("auto.offset.reset", "smallest");
> but running consumer after producer works.
> But in my use case, it's not alwasy true for this.
> Another question, The consumer should remember the offset. it's not
> very easy to use.
>
> On Mon, Jun 23, 2014 at 11:05 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> > Did you start the consumer after the producer? The default behavior of
> the
> > consumer is to "consume from the tail of the log", and hence if there is
> no
> > new messages coming in after the consumer started, it will get nothing.
> You
> > may set
> >
> > auto.offset.reset="smallest"
> >
> > and try again.
> >
> > Guozhang
> >
> >
> > On Sun, Jun 22, 2014 at 9:10 PM, Li Li <fancye...@gmail.com> wrote:
> >
> >> hi all,
> >>    I am reading the book "apache kafka" and write a simple producer
> >> and consumer class. the producer works but the consumer hangs.
> >>    The producer class:
> >> public static void main(String[] args) {
> >>     String topic="test-topic";
> >>     Properties props = new Properties();
> >>     props.put("metadata.broker.list","linux157:9092");
> >>     props.put("serializer.class","kafka.serializer.StringEncoder");
> >>     props.put("request.required.acks", "1");
> >>     ProducerConfig config = new ProducerConfig(props);
> >>     Producer<Integer, String> producer = new
> >> Producer<Integer,String>(config);
> >>     for(int i=0;i<100;i++){
> >>         KeyedMessage<Integer, String> data = new
> >> KeyedMessage<Integer,String>(topic, "msg"+i);
> >>         producer.send(data);
> >>     }
> >>     producer.close();
> >>
> >> }
> >>
> >> public class TestKafkaConsumer {
> >> private final ConsumerConnector consumer;
> >> private final String topic;
> >>
> >> public TestKafkaConsumer(String zookeeper, String groupId, String
> topic) {
> >> Properties props = new Properties();
> >> props.put("zookeeper.connect", zookeeper);
> >> props.put("group.id", groupId);
> >> props.put("zookeeper.session.timeout.ms", "500");
> >> props.put("zookeeper.sync.time.ms", "250");
> >> props.put("auto.commit.interval.ms", "1000");
> >> consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
> >> props));
> >> this.topic = topic;
> >> }
> >>
> >> public void testConsumer() {
> >>     Map<String, Integer> topicCount = new HashMap<String, Integer>();
> >>     // Define single thread for topic
> >>     topicCount.put(topic, new Integer(1));
> >>     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
> >> consumer
> >>              .createMessageStreams(topicCount);
> >>     List<KafkaStream<byte[], byte[]>> streams =
> consumerStreams.get(topic);
> >>     for (final KafkaStream stream : streams) {
> >>         ConsumerIterator<byte[], byte[]> consumerIte =
> stream.iterator();
> >>         while (consumerIte.hasNext())
> >>             System.out.println("Message from Single Topic :: "
> >>                        + new String(consumerIte.next().message()));
> >>         }
> >>     if (consumer != null)
> >>         consumer.shutdown();
> >> }
> >>
> >> public static void main(String[] args) {
> >>     String topic = "test-topic";
> >>     TestKafkaConsumer simpleHLConsumer = new
> >> TestKafkaConsumer("linux157:2181",testgroup22", topic);
> >>     simpleHLConsumer.testConsumer();
> >>
> >> }
> >>
> >> }
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Reply via email to