you also need to set the starting offset time. So to do what you want, you would need to uncomment this line:
// spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); I can definitely suggest looking at the storm-kafka source code to see how forceFromStart gets used On Thu, Nov 6, 2014 at 3:38 PM, Sa Li <[email protected]> wrote: > Hi, All > > I am using TridentKafkaConfig to consume data, I was not setting > > spoutConf.forceFromStart = true; > > so I am consuming the data from the latest time I assume, and it works. > However, if I > set spoutConf.forceFromStart = true; spout just consumes nothing, I > thought it should consume the data from the beginning of the Kafka stream. > > Here is the code: > > BrokerHosts zk = new ZkHosts("10.100.70.128:2181"); > // TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, > "PofApiTest"); > TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, > "topictest"); > spoutConf.scheme = new SchemeAsMultiScheme(new > StringScheme()); > > spoutConf.fetchSizeBytes = 50*1024*1024; > spoutConf.bufferSizeBytes = 50*1024*1024; > // spoutConf.startOffsetTime = > kafka.api.OffsetRequest.EarliestTime(); > // spoutConf.startOffsetTime = > kafka.api.OffsetRequest.LatestTime(); > // spoutConf.socketTimeoutMs = 1000; > // spoutConf.fetchMaxWait = 1000; > spoutConf.forceFromStart = true; > // spoutConf.maxOffsetBehind = Long.MAX_VALUE; > // spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true; > // spoutConf.metricsTimeBucketSizeInSecs = 600; > > OpaqueTridentKafkaSpout kafkaSpout = new > OpaqueTridentKafkaSpout(spoutConf); > // TransactionalTridentKafkaSpout kafkaSpout = new > TransactionalTridentKafkaSpout(spoutConf); > > TridentTopology topology = new TridentTopology(); > > > topology.newStream("topictestspout", kafkaSpout) > // topology.newStream("test", new RandomTupleSpout()) > // this test tells the kafkaSpout has the overhead to cause the latency > .parallelismHint(4) > // .shuffle() > // .each(new Fields("batchid","word"), > .each(new Fields("str"), > new JsonObjectParse(), > new Fields("userid","event")) > .groupBy(new Fields("userid")) > > .persistentAggregate(PostgresqlState.newFactory(config), new > Fields("userid","event"), new EventUpdater(), new Fields( "eventword")); > // .parallelismHint(6); > > > Can anyone tell me why I can't consume the data from beginning? > > thanks > > > Alec >
