Dale Jin created KAFKA-4086: ------------------------------- Summary: long processing consumer restart will stall Key: KAFKA-4086 URL: https://issues.apache.org/jira/browse/KAFKA-4086 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.10.0.0 Reporter: Dale Jin
[~hachikuji] We have a long processing consumer. Whenever a new consumer tries to join the group while the long processing consumer is processing, the new consumer will stall. If we kill the long processing consumer and restart it again, it will stall both consumers. When we kill the long processing consumer, that consumer tries to issue a leaveGroup command but it will fail seemingly due to the client request timeout. When we try to start the long processing consumer again, it seems to be sending topic metadata to the broker then the subsequent join group request is issued and returning a future but when I check the server log I don't see the corresponding request in kafka-request.log. When we construct the consumer, we use the following code (based on kafka-python library): {code} self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, value_deserializer=deserializer, group_id=self.user_defined_sub_name, heartbeat_interval_ms=10000, session_timeout_ms=300000, enable_auto_commit=False) {code} on the server side, we use 0.10.0.0 with default settings. looks like a `RebalanceInProgressError` is thrown {code} 2016-08-22 20:39:08,984 - kafka.coordinator - INFO - Discovered coordinator 100 for group v1.user.queue 2016-08-22 20:39:08,984 - kafka.coordinator.consumer - INFO - Revoking previously assigned partitions set() for group v1.user.queue 2016-08-22 20:39:08,990 - kafka.cluster - DEBUG - Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 1) 2016-08-22 20:39:08,990 - kafka.coordinator - INFO - (Re-)joining group v1.user.queue 2016-08-22 20:39:08,990 - kafka.coordinator - DEBUG - Sending JoinGroup (JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')])) to coordinator 100 2016-08-22 20:39:08,991 - kafka.conn - DEBUG - <BrokerConnection host=10.128.64.81/10.128.64.81 port=9092> Request 5: JoinGroupRequest_v0(group='v1.user.queue', session_timeout=300000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x01\x00\x1av1.messagingtest.user_info\x00\x00\x00\x00')]) 2016-08-22 20:43:04,576 - kafka.conn - WARNING - <BrokerConnection host=10.128.64.81/10.128.64.81 port=9092> timed out after 40000 ms. Closing connection. 2016-08-22 20:43:04,576 - kafka.client - WARNING - Node 100 connection failed – refreshing metadata 2016-08-22 20:43:04,576 - kafka.coordinator - ERROR - Error sending JoinGroupRequest_v0 to node 100 [Error 7 RequestTimedOutError: Request timed out after 40000 ms] 2016-08-22 20:43:04,576 - kafka.coordinator - WARNING - Marking the coordinator dead (node 100) for group v1.user.queue: None. 2016-08-22 20:43:04,678 - kafka.coordinator - DEBUG - Sending group coordinator request for group v1.user.queue to broker 100 {code} fyi, we turned on the following in log4j: {code} log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender log4j.additivity.kafka.server.KafkaApis=true log4j.logger.kafka.request.logger=TRACE, requestAppender log4j.additivity.kafka.request.logger=true log4j.logger.kafka.controller=TRACE, controllerAppender log4j.additivity.kafka.controller=true log4j.logger.state.change.logger=TRACE, stateChangeAppender log4j.additivity.state.change.logger=true {code} Please let us know how we can proceed forward to find out the root cause. -- This message was sent by Atlassian JIRA (v6.3.4#6332)