i see this in the consumer logs
[kafka.consumer.ConsumerFetcherManager]
[ConsumerFetcherManager-1397188062631] Adding fetcher for partition
[taf.referral.emails.service,11], initOffset 250 to broker 1 with
fetcherId 0
but no data and i get this warning
[ConsumerFetcherThread-group1_ip-10-91-35-43-1397188061429-b5ff1205-0-1]
[kafka.consumer.ConsumerFetcherThread]
[ConsumerFetcherThread-group1_ip-10-91-35-43-1397188061429-b5ff1205-0-1], Error
in fetch Name: FetchRequest; Version: 0; CorrelationId: 73; ClientId:
group1-ConsumerFetcherThread-group1_ip-10-91-35-43-1397188061429-b5ff1205-0-1;
ReplicaId: -1; MaxWait: 180000 ms; MinBytes: 1 bytes; RequestInfo:
[taf.referral.emails.service,0] -> PartitionFetchInfo(253,10485760)
java.net.SocketTimeoutException
at
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
at kafka.utils.Utils$.read(Utils.scala:395)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Thanks
Arjun Narasimha Kota
On Friday 11 April 2014 09:51 AM, Arjun wrote:
I hope this one would give u a better idea.
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
group1 --zkconnect zkhost:port --topic testtopic
Group Topic Pid Offset
logSize Lag Owner
group1 testtopic 0 253 253 0
group1_ip-xx-1397188061429-b5ff1205-0
group1 testtopic 1 267 267 0
group1_ip-xx-1397188061429-b5ff1205-0
group1 testtopic 2 254 254 0
group1_ip-xx-1397188061429-b5ff1205-0
group1 testtopic 3 265 265 0
group1_ip-xx-1397188061429-b5ff1205-0
group1 testtopic 4 261 261 0
group1_ip-xx-1397188061429-b5ff1205-1
group1 testtopic 5 294 294 0
group1_ip-xx-1397188061429-b5ff1205-1
group1 testtopic 6 248 248 0
group1_ip-xx-1397188061429-b5ff1205-1
group1 testtopic 7 271 271 0
group1_ip-xx-1397188061429-b5ff1205-1
group1 testtopic 8 240 240 0
group1_ip-xx-1397188061429-b5ff1205-2
group1 testtopic 9 261 261 0
group1_ip-xx-1397188061429-b5ff1205-2
group1 testtopic 10 290 290 0
group1_ip-xx-1397188061429-b5ff1205-2
group1 testtopic 11 250 251 1
group1_ip-xx-1397188061429-b5ff1205-2
If you see the output, in the last line the lag is 1 for that
partition. I just send one message. This topic is not new as you see
there are lot of messages which have accumlated from yesterday. This
one message will not be consumed by consumer what so ever. But if i
send some 10 messages then all the messages are consumed.
Please let me know if i have to change any consumer properties.
My consumer properties are :
"fetch.wait.max.ms"="180000"
"fetch.min.bytes" = "1"
"auto.offset.reset" = "smallest"
"auto.commit.enable"= "false"
"fetch.message.max.bytes" = "1048576"
Thanks
Arjun Narasimha Kota
On Friday 11 April 2014 06:23 AM, Arjun Kota wrote:
The consumer uses do specific topics.
On Apr 11, 2014 6:23 AM, "Arjun Kota" <[email protected]
<mailto:[email protected]>> wrote:
Yes the message shows up on the server.
On Apr 11, 2014 12:07 AM, "Guozhang Wang" <[email protected]
<mailto:[email protected]>> wrote:
Hi Arjun,
If you only send one message, does that message show up on
the server? Does
you consumer use wildcard topics or specific topics?
Guozhang
On Thu, Apr 10, 2014 at 9:20 AM, Arjun <[email protected]
<mailto:[email protected]>> wrote:
> But we have auto offset reset to smallest not largest,
even then this
> issue arises? If so is there any work around?
>
> Thanks
> Arjun NArasimha Kota
>
>
> On Thursday 10 April 2014 09:39 PM, Guozhang Wang wrote:
>
>> It could be https://issues.apache.org/jira/browse/KAFKA-1006.
>>
>> Guozhang
>>
>>
>> On Thu, Apr 10, 2014 at 8:50 AM, Arjun
<[email protected] <mailto:[email protected]>> wrote:
>>
>> its auto created
>>> but even after topic creation this is the scenario
>>>
>>> Arjun
>>>
>>> On Thursday 10 April 2014 08:41 PM, Guozhang Wang wrote:
>>>
>>> Hi Arjun,
>>>>
>>>> Did you manually create the topic or use
auto.topic.creation?
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Thu, Apr 10, 2014 at 7:39 AM, Arjun
<[email protected] <mailto:[email protected]>> wrote:
>>>>
>>>> Hi,
>>>>
>>>>> We have 3 node kafka 0.8 setup with zookeepers
ensemble. We use high
>>>>> level
>>>>> consumer with auto commit offset false. I am facing
some peculiar
>>>>> problem
>>>>> with kafka. When i send some 10-20 messages or so the
consumer starts
>>>>> to
>>>>> consume the messages. But if i send only one message
to kafka, then
>>>>> even
>>>>> though consumer is active it is not trying to fetch the
message. There
>>>>> is
>>>>> nothing in logs, just the messages are being fetched by
the kafka
>>>>> consumer.
>>>>> The messages are there in the Kafka server. Can some
one let me know
>>>>> where
>>>>> i am doing wrong.
>>>>>
>>>>>
>>>>> Thanks
>>>>> Arjun Narasimha Kota
>>>>>
>>>>>
>>>>>
>>>>
>>
>
--
-- Guozhang