>From reading around it looked like setting "autooffset.reset" = "smallest" would do this, however I'm not actually seeing that behavior.
The reason is that a consumer actually consults this config only if it doesn't find a previous offset stored for it's group in zookeeper. So, it will respect this config only on startup and not on a subsequent run, unless you delete the group information from zookeeper before starting the consumer. That is the reason you see the right behavior with a new group.id. Thanks, Neha On Fri, Mar 7, 2014 at 3:54 PM, Adam Phelps <a...@opendns.com> wrote: > I've been trying to write a test consumer in Java for a new use of our > Kafka cluster (currently used solely with Storm), however this use needs > to always start from the earliest offset in the topic. From reading > around it looked like setting "autooffset.reset" = "smallest" would do > this, however I'm not actually seeing that behavior. So far the only > way I've managed to force it to start from the beginning is using a new > groupid with each run, which does not seem like an optimal method. > > Am I don't this incorrectly, or is there some other means of doing this > from the Java API? > > public KafkaUpdateSource(String zkQuorum, String topic, String group) { > Properties props = new Properties(); > props.put("zk.connect", zkQuorum); > props.put("zk.connectiontimeout.ms", "100000"); > > if (group != null) { > props.put("groupid", group); > } else { > props.put("groupid", "test_group"); > } > props.put("zk.synctime.ms", "200"); > props.put("autocommit.interval.ms", "1000"); > props.put("consumer.timeout.ms", "1000"); > > // XXX: For some reason the "start from smallest offset" option > doesn't > // seem to work > props.put("autooffset.reset", "smallest"); > > ConsumerConnector consumer = > Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > topicCountMap.put(topic, new Integer(1)); > iterator = > consumer.createMessageStreams(topicCountMap).get(topic).get(0).iterator(); > } > > - Adam >