you also need to set the starting offset time. So to do what you want, you
would need to uncomment this line:

// spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

I can definitely suggest looking at the storm-kafka source code to see how
forceFromStart gets used

On Thu, Nov 6, 2014 at 3:38 PM, Sa Li <[email protected]> wrote:

> Hi, All
>
> I am using TridentKafkaConfig to consume data, I was not setting
>
> spoutConf.forceFromStart = true;
>
> so I am consuming the data from the latest time I assume, and it works.
> However, if I
> set spoutConf.forceFromStart = true; spout just consumes nothing, I
> thought it should consume the data from the beginning of the Kafka stream.
>
> Here is the code:
>
>   BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
> //              TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "PofApiTest");
>                 TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "topictest");
>                 spoutConf.scheme = new SchemeAsMultiScheme(new
> StringScheme());
>
>                 spoutConf.fetchSizeBytes = 50*1024*1024;
>                 spoutConf.bufferSizeBytes = 50*1024*1024;
> //                spoutConf.startOffsetTime =
> kafka.api.OffsetRequest.EarliestTime();
> //                spoutConf.startOffsetTime =
> kafka.api.OffsetRequest.LatestTime();
> //                spoutConf.socketTimeoutMs = 1000;
> //                spoutConf.fetchMaxWait = 1000;
>                spoutConf.forceFromStart = true;
> //                spoutConf.maxOffsetBehind = Long.MAX_VALUE;
> //                spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true;
> //                spoutConf.metricsTimeBucketSizeInSecs = 600;
>
>                 OpaqueTridentKafkaSpout kafkaSpout = new
> OpaqueTridentKafkaSpout(spoutConf);
> //                TransactionalTridentKafkaSpout kafkaSpout = new
> TransactionalTridentKafkaSpout(spoutConf);
>
>                 TridentTopology topology = new TridentTopology();
>
>
>  topology.newStream("topictestspout", kafkaSpout)
> //                  topology.newStream("test", new RandomTupleSpout())
>    // this test tells the kafkaSpout has the overhead to cause the latency
>                                       .parallelismHint(4)
> //                                      .shuffle()
> //                                      .each(new Fields("batchid","word"),
>                                       .each(new Fields("str"),
>                                             new JsonObjectParse(),
>                                             new Fields("userid","event"))
>                                       .groupBy(new Fields("userid"))
>
> .persistentAggregate(PostgresqlState.newFactory(config), new
> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
> //                                      .parallelismHint(6);
>
>
> Can anyone tell me why I can't consume the data from beginning?
>
> thanks
>
>
> Alec
>

Reply via email to