I've been trying to write a test consumer in Java for a new use of our
Kafka cluster (currently used solely with Storm), however this use needs
to always start from the earliest offset in the topic.  From reading
around it looked like setting "autooffset.reset" = "smallest" would do
this, however I'm not actually seeing that behavior.  So far the only
way I've managed to force it to start from the beginning is using a new
groupid with each run, which does not seem like an optimal method.

Am I don't this incorrectly, or is there some other means of doing this
from the Java API?

    public KafkaUpdateSource(String zkQuorum, String topic, String group) {
        Properties props = new Properties();
        props.put("zk.connect", zkQuorum);
        props.put("zk.connectiontimeout.ms", "100000");

        if (group != null) {
            props.put("groupid", group);
        } else {
            props.put("groupid", "test_group");
        }
        props.put("zk.synctime.ms", "200");
        props.put("autocommit.interval.ms", "1000");
        props.put("consumer.timeout.ms", "1000");

        // XXX: For some reason the "start from smallest offset" option
doesn't
        // seem to work
        props.put("autooffset.reset", "smallest");

        ConsumerConnector consumer =
Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        iterator =
consumer.createMessageStreams(topicCountMap).get(topic).get(0).iterator();
    }

- Adam

Reply via email to