I've been trying to write a test consumer in Java for a new use of our Kafka cluster (currently used solely with Storm), however this use needs to always start from the earliest offset in the topic. From reading around it looked like setting "autooffset.reset" = "smallest" would do this, however I'm not actually seeing that behavior. So far the only way I've managed to force it to start from the beginning is using a new groupid with each run, which does not seem like an optimal method.
Am I don't this incorrectly, or is there some other means of doing this from the Java API? public KafkaUpdateSource(String zkQuorum, String topic, String group) { Properties props = new Properties(); props.put("zk.connect", zkQuorum); props.put("zk.connectiontimeout.ms", "100000"); if (group != null) { props.put("groupid", group); } else { props.put("groupid", "test_group"); } props.put("zk.synctime.ms", "200"); props.put("autocommit.interval.ms", "1000"); props.put("consumer.timeout.ms", "1000"); // XXX: For some reason the "start from smallest offset" option doesn't // seem to work props.put("autooffset.reset", "smallest"); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); iterator = consumer.createMessageStreams(topicCountMap).get(topic).get(0).iterator(); } - Adam