Hi, All
I am using TridentKafkaConfig to consume data, I was not setting
spoutConf.forceFromStart = true;
so I am consuming the data from the latest time I assume, and it works.
However, if I
set spoutConf.forceFromStart = true; spout just consumes nothing, I thought
it should consume the data from the beginning of the Kafka stream.
Here is the code:
BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
// TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
"PofApiTest");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
"topictest");
spoutConf.scheme = new SchemeAsMultiScheme(new
StringScheme());
spoutConf.fetchSizeBytes = 50*1024*1024;
spoutConf.bufferSizeBytes = 50*1024*1024;
// spoutConf.startOffsetTime =
kafka.api.OffsetRequest.EarliestTime();
// spoutConf.startOffsetTime =
kafka.api.OffsetRequest.LatestTime();
// spoutConf.socketTimeoutMs = 1000;
// spoutConf.fetchMaxWait = 1000;
spoutConf.forceFromStart = true;
// spoutConf.maxOffsetBehind = Long.MAX_VALUE;
// spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true;
// spoutConf.metricsTimeBucketSizeInSecs = 600;
OpaqueTridentKafkaSpout kafkaSpout = new
OpaqueTridentKafkaSpout(spoutConf);
// TransactionalTridentKafkaSpout kafkaSpout = new
TransactionalTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology();
topology.newStream("topictestspout", kafkaSpout)
// topology.newStream("test", new RandomTupleSpout())
// this test tells the kafkaSpout has the overhead to cause the latency
.parallelismHint(4)
// .shuffle()
// .each(new Fields("batchid","word"),
.each(new Fields("str"),
new JsonObjectParse(),
new Fields("userid","event"))
.groupBy(new Fields("userid"))
.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
// .parallelismHint(6);
Can anyone tell me why I can't consume the data from beginning?
thanks
Alec