[ 
https://issues.apache.org/jira/browse/KAFKA-1475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14017341#comment-14017341
 ] 

Hang Qi commented on KAFKA-1475:
--------------------------------

Hi Guozhang,

Thanks for your reply, in this case, we have to add some health check in 
application level like count how many messages received within a certain period.

I am still very curious about step3, and want to dig out the root cause. I've 
collected the logs of zkclient in consumer side, however, the logs in the zk 
server side are purged. But from the logs on client side, we can still see 
something interesting.

Following is the sequence flow:
1. 07:55:14 zk client got session expire,populate the event to watcher. Thus 
kafka client tried to recreate ephemeral node 
'/kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee
2. 07:55:14 zk sender connected to mbus0002, and send connectReq
3. 07:55:26 zk sender did not hear server for connectReq response, thus close 
the connection and try to connect.
4. 07:55:40 zk sender established connection with mbus0005 and got session 
0x545f6dc6f510757
5. 07:55:40 zk sender got response of create ephemeral node, zk server 
responded node exist(response code is -110).
6. 07:55:40 kafka client wanted to read 
/kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee,
 zk sender sent request and also read the response, kafka client noticed that 
the data content of node is the same as what it wants to write 
(#7b20227061747465726e223a22737461746963222c2022737562736372697074696f6e223a7b202253746f70416c6c5f55532d45415354223a2031207d2c202274696d657374616d70223a2231343030383436313134383435222c202276657273696f6e223a31207d).
 To note that, the owner of the ephemeral node is 163808312244176699 = 
0x245f6cac692073b in stat 
(s{150396710990,150396710990,1400846114866,1400846114866,0,0,0,163808312244176699,105,0,150396710990}),which
 is not the current session,nor the sessionId before the session 
expire(0x345f6cac6ed071d)
7. 07:56:07 kafka client wanted read the ephemeral node failure,zk response 
node not exist. 

One more thing is that another zk client also read the same content and stat 
for path  
/kafka8Agg/consumers/myconsumergroup/ids/myconsumergroup_ooo0001-1400815740329-5055aeee,the
 zk server it connected to is mbus0002.

The weird thing is that there is no log for sessionId 0x245f6cac692073b in the 
log. So my gut feeling is that, between 07:55:14 and 07:55:26, somehow, zk 
client sent create request to mbus0002,and mbus002 processed it successful , 
but the response is not read by client, the session 0x245f6cac692073b was 
created at that time.

However, checking the ZK client code, this should not happen, zk client 
supposed to send requests to server only when it is in syncConnected state. 

Do you have any idea or comments about this?

According to this issue, I feel that it is more safe to check sessionId of 
ephemeral node rather than timestamp when recovering from ZK session expire. 

After all, as you said, kafka 0.8 consumer will be obsoleted anyway. But I want 
to call this out as the same code utils/ZkUtils.scala would be shared with 
broker. 

Thanks
Hang Qi

> Kafka consumer stops LeaderFinder/FetcherThreads, but application does not 
> know
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-1475
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1475
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 0.8.0
>         Environment: linux, rhel 6.4
>            Reporter: Hang Qi
>            Assignee: Neha Narkhede
>              Labels: consumer
>         Attachments: 5055aeee-zk.txt
>
>
> We encounter an issue of consumers not consuming messages in production. ( 
> this consumer has its own consumer group, and just consumes one topic of 3 
> partitions.)
> Based on the logs, we have following findings:
> 1. Zookeeper session expires, kafka highlevel consumer detected this event, 
> and released old broker parition ownership and re-register consumer.
> 2. Upon creating ephemeral path in Zookeeper, it found that the path still 
> exists, and try to read the content of the node.
> 3. After read back the content, it founds the content is same as that it is 
> going to write, so it logged as "[ZkClient-EventThread-428-ZK/kafka] 
> (kafka.utils.ZkUtils$) - 
> /consumers/consumerA/ids/consumerA-1400815740329-5055aeee exists with value { 
> "pattern":"static", "subscription":{ "TOPIC": 1}, 
> "timestamp":"1400846114845", "version":1 } during connection loss; this is 
> ok", and doing nothing.
> 4. After that, it throws exception indicated that the cause is 
> "org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /consumers/consumerA/ids/consumerA-1400815740329-5055aeee" during 
> rebalance. 
> 5. After all retries failed, it gave up retry and left the 
> LeaderFinderThread, FetcherThread stopped. 
> Step 3 looks very weird, checking the code, there is timestamp contains in 
> the stored data, it may be caused by Zookeeper issue.
> But what I am wondering is that whether it is possible to let application 
> (kafka client users) to know that the underline LeaderFinderThread and 
> FetcherThread are stopped, like allowing application to register some 
> callback or throws some exception (by invalidate the KafkaStream iterator for 
> example)? For me, it is not reasonable for the kafka client to shutdown 
> everything and wait for next rebalance, and let application wait on 
> iterator.hasNext() without knowing that there is something wrong underline.
> I've read about twiki about kafka 0.9 consumer rewrite, and there is a 
> ConsumerRebalanceCallback interface, but I am not sure how long it will take 
> to be ready, and how long it will take for us to migrate. 
> Please help to look at this issue.  Thanks very much!



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to