The set up has 3 kafka brokers running on 3 different ec2 nodes (I added the host.name in broker config). I am not committing any messages in my consumer. The consumer is exact replica of the ConsumerGroupExample.

The test machine (10.60.15.123) is outside these systems security group but has all ports opened both tcp and udp

If i run the same code on any system which is in the same security group, things work pretty fine. I feel there is something to do with the ec2node set up.

Consumer offset :

ubuntu@ip-10-235-39-219:~/kafka/new/kafka_2.8.0-0.8.0$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1 --zkconnect 10.235.39.219:2181,10.249.171.5:2181,10.243.42.35:2181 --topic taf.referral.emails.service Group Topic Pid Offset logSize Lag Owner group1 taf.referral.emails.service 0 6 6 0 none group1 taf.referral.emails.service 1 3 3 0 none group1 taf.referral.emails.service 2 3 3 0 none


Kafka.log :

[2014-02-13 02:15:09,837] TRACE Processor id 0 selection time = 300 ms (kafka.network.Processor) [2014-02-13 02:15:09,916] DEBUG Accepted connection from /10.60.15.123 on /10.235.39.219:9092. sendBufferSize [actual|requested]: [131071|1048576] recvBufferSize [actual|requested]: [131071|1048576] (kafka.network.Acceptor) [2014-02-13 02:15:09,916] TRACE Processor id 0 selection time = 79 ms (kafka.network.Processor) [2014-02-13 02:15:09,917] DEBUG Processor 0 listening to new connection from /10.60.15.123:45056 (kafka.network.Processor) [2014-02-13 02:15:09,972] TRACE Processor id 0 selection time = 55 ms (kafka.network.Processor) [2014-02-13 02:15:09,972] TRACE 195 bytes read from /10.60.15.123:45056 (kafka.network.Processor) [2014-02-13 02:15:09,997] DEBUG Got ping response for sessionid: 0x2442a181bfb0000 after 1ms (org.apache.zookeeper.ClientCnxn) [2014-02-13 02:15:10,036] DEBUG [FetchRequestPurgatory-0] Expiring fetch request Name: FetchRequest; Version: 0; CorrelationId: 54; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 2; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [taf.referral.emails.service,1] -> PartitionFetchInfo(3,1048576). (kafka.server.KafkaApis$FetchRequestPurgatory)



Kafka -request log

[2014-02-13 02:15:09,837] TRACE Processor id 0 selection time = 300 ms (kafka.network.Processor) [2014-02-13 02:15:09,916] TRACE Processor id 0 selection time = 79 ms (kafka.network.Processor) [2014-02-13 02:15:09,917] DEBUG Processor 0 listening to new connection from /10.60.15.123:45056 (kafka.network.Processor) [2014-02-13 02:15:09,972] TRACE Processor id 0 selection time = 55 ms (kafka.network.Processor) [2014-02-13 02:15:09,972] TRACE 195 bytes read from /10.60.15.123:45056 (kafka.network.Processor) [2014-02-13 02:15:09,973] TRACE Processor 0 received request : Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId: group1-ConsumerFetcherThread-group1_ec2-54-225-44-248.compute-1.amazonaws.com-1392275707952-31ee2fed-0-0; ReplicaId: -1; MaxWait: 1000000 ms; MinBytes: 100 bytes; RequestInfo: [taf.referral.emails.service,2] -> PartitionFetchInfo(3,1048576),[taf.referral.emails.service,1] -> PartitionFetchInfo(3,1048576) (kafka.network.RequestChannel$) [2014-02-13 02:15:09,973] TRACE [KafkaApi-0] Handling request: Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId: group1-ConsumerFetcherThread-group1_ec2-54-225-44-248.compute-1.amazonaws.com-1392275707952-31ee2fed-0-0; ReplicaId: -1; MaxWait: 1000000 ms; MinBytes: 100 bytes; RequestInfo: [taf.referral.emails.service,2] -> PartitionFetchInfo(3,1048576),[taf.referral.emails.service,1] -> PartitionFetchInfo(3,1048576) from client: /10.60.15.123:45056 (kafka.server.KafkaApis) [2014-02-13 02:15:09,973] TRACE [KafkaApi-0] Fetching log segment for topic, partition, offset, size = (taf.referral.emails.service,2,3,1048576) (kafka.server.KafkaApis) [2014-02-13 02:15:09,974] TRACE [KafkaApi-0] Fetching log segment for topic, partition, offset, size = (taf.referral.emails.service,1,3,1048576) (kafka.server.KafkaApis) [2014-02-13 02:15:09,974] DEBUG [KafkaApi-0] Putting fetch request with correlation id 0 from client group1-ConsumerFetcherThread-group1_ec2-54-225-44-248.compute-1.amazonaws.com-1392275707952-31ee2fed-0-0 into purgatory (kafka.server.KafkaApis)




On Wednesday 12 February 2014 09:26 PM, Jun Rao wrote:
Interesting. So you have 4 messages in the broker. The checkpointed offset
for the consumer is at the 3rd message. Did you change the default setting
of auto.commit.enable? Also, if you look at the
request log, what's the offset in the fetch request from this consumer?
Thanks,
Jun


On Tue, Feb 11, 2014 at 10:07 PM, Arjun <ar...@socialtwist.com> wrote:

The topic name is correct, the o/p of the ConsumerOffserChecker is

arjunn@arjunn-lt:~/Downloads/Kafka0.8/new/kafka_2.8.0-0.8.0$
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1
--zkconnect 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --topic
taf.referral.emails.service
Group           Topic                          Pid Offset logSize
Lag             Owner
group1          taf.referral.emails.service    0   2 4               2
group1_arjunn-lt-1392133080519-e24b249b-0
group1          taf.referral.emails.service    1   2 4               2
group1_arjunn-lt-1392133080519-e24b249b-0

thanks
Arjun Narasimha Kota




On Wednesday 12 February 2014 10:21 AM, Jun Rao wrote:

Could you double check that you used the correct topic name? If so, could
you run ConsumerOffsetChecker as described in
https://cwiki.apache.org/confluence/display/KAFKA/FAQ and see if there is
any lag?

Thanks,

Jun


On Tue, Feb 11, 2014 at 8:45 AM, Arjun Kota <ar...@socialtwist.com>
wrote:

  fetch.wait.max.ms=10000
fetch.min.bytes=128

My message size is much more than that.
On Feb 11, 2014 9:21 PM, "Jun Rao" <jun...@gmail.com> wrote:

  What's the fetch.wait.max.ms and fetch.min.bytes you used?
Thanks,

Jun


On Tue, Feb 11, 2014 at 12:54 AM, Arjun <ar...@socialtwist.com> wrote:

  With the same group id from the console consumer its working fine.

On Tuesday 11 February 2014 01:59 PM, Guozhang Wang wrote:

  Arjun,
Are you using the same group name for the console consumer and the

java
consumer?
Guozhang


On Mon, Feb 10, 2014 at 11:38 PM, Arjun <ar...@socialtwist.com>

wrote:
   Hi Jun,
No its not that problem. I am not getting what the problem is can you
please help.

thanks
Arjun Narasimha Kota


On Monday 10 February 2014 09:10 PM, Jun Rao wrote:

   Does

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-
Whydoesmyconsumernevergetanydata?
apply?

Thanks,

Jun


On Sun, Feb 9, 2014 at 10:27 PM, Arjun <ar...@socialtwist.com>

wrote:
    Hi,
  I started using kafka some time back. I was experimenting with 0.8.
My
problem is the kafka is unable to consume the messages. My
configuration
is kafka broker on the local host and zookeeper on the local host.

I
have only one broker and one consumer at present.
What have I done:
         1) I used the java examples in the kafka src and pushed
some

600
messages to the broker
         2) I used the console consumer to check weather the
messages

are
there in the broker or not. Console consumer printed all 600
messages
         3) Now i used the java Consumer code, and tried to get those
messages. This is not printing any messages. It just got stuck

When was it working earlier:
         -When i tried with three brokers and three consumers in the

same
machine, with the same configuration it worked fine.
         -I changed the properties accordingly when i tried to make

it
work
with one broker and one consumer

What does log say:
         - attaching the logs even

If some one points me where I am doing wrong it would be helpful.

Thanks
Arjun Narasimha Kota






Reply via email to