Hi Kafka,

I have a trident topology in storm which consumes data from kafka. Now i am
seeing an issue in KafkaSpout. This is not consuming the very first tthe
first uncommitted offset data from kafka.

My storm version is 1.1.1 and kafka version is 0.11.0.0. I have a topic say
X and partition of the topic is 3.

I have following configuration to consume data using KafkaSpout


KafkaSpoutConfig<String, String> kafkaConfig =
KafkaSpoutConfig.builder(PropertyUtil.getStringValue(PropertyUtil.KAFKA_BROKERS),
PropertyUtil.getStringValue(PropertyUtil.TOPIC_NAME))
.setProp(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "4194304")
.setProp(ConsumerConfig.GROUP_ID_CONFIG,PropertyUtil.getStringValue(PropertyUtil.CONSUMER_ID))
.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, "4194304")
.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
.build();

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new KafkaSpout<String,String>(kafkaConfig),3);

Following are my test cases

1. Processor started with new consumer id. The very first time it starts to
read the data from latest. Fine.
2. Sending some messages to kafka and i am seeing all the messages are
consumed by my trident topology.
3. Stopped my trident topology.
4. Sending some messages to kafka (partition_0). Say example
> msg_1
> msg_2
> msg_3
> msg_4
> msg_5

5. Started the topology. And kafkaspout consumes the data from msg_2. It is
not consuming the msg_1.
6. Stopped  the topology.
7. Sending some messages to kafka to all the partitions (_0, _1, _2). Say
example
Partition_0
> msg_6
> msg_7
> msg_8
Partition_1
> msg_9
> msg_10
> msg_11
Partition_2
> msg_12
> msg_13
> msg_14

8. Started the topology. And kafkaspout consumes following messages
> msg_7
> msg_8
> msg_10
> msg_11
> msg_13
> msg_14

It skipped the earliest uncommitted message in each partition.

Below is the definitions of UNCOMMITTED_LATEST in JavaDoc.

UNCOMMITTED_LATEST means that the kafka spout polls records from the last
committed offset, if any. If no offset has been committed, it behaves as
LATEST.

As per the definitions, it should read from last committed offset. But it
looks like it is reading from uncommitted earliest + 1. I meant the pointer
seems to be wrong.

Please have a look and let me know if anything wrong in my tests.

I am expecting a response from you, even it is not an issue.

Thanks,
Senthil

Reply via email to