[ https://issues.apache.org/jira/browse/KAFKA-4670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15830670#comment-15830670 ]
Roger Hoover commented on KAFKA-4670: ------------------------------------- Ah, yes, thanks, [~ijuma]. > Kafka Consumer should validate FetchResponse > -------------------------------------------- > > Key: KAFKA-4670 > URL: https://issues.apache.org/jira/browse/KAFKA-4670 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.10.2.0 > Reporter: Roger Hoover > Assignee: Jason Gustafson > Priority: Minor > > As a negative test case, I purposefully configured a bad advertised listener > endpoint. > {code} > advertised.listeners=PLAINTEXT://www.google.com:80 > {code} > This causes the Consumer to over-allocate and run out of memory. > {quote} > [2017-01-18 10:03:03,866] DEBUG Sending metadata request > (type=MetadataRequest, topics=foo) to node -1 > (org.apache.kafka.clients.NetworkClient) > [2017-01-18 10:03:03,870] DEBUG Updated cluster metadata version 2 to > Cluster(id = oerqPfCuTCKYUUaWdFUSVQ, nodes = [www.google.com:80 (id: 0 rack: > null)], partitions = [Partition(topic = foo, partition = 0, leader = 0, > replicas = [0], isr = [0])]) (org.apache.kafka.clients.Metadata) > [2017-01-18 10:03:03,871] DEBUG Received group coordinator response > ClientResponse(receivedTimeMs=1484762583870, latencyMs=88, > disconnected=false, > requestHeader={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, > > responseBody={error_code=0,coordinator={node_id=0,host=www.google.com,port=80}}) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2017-01-18 10:03:03,871] INFO Discovered coordinator www.google.com:80 (id: > 2147483647 rack: null) for group console-consumer-64535. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2017-01-18 10:03:03,871] DEBUG Initiating connection to node 2147483647 at > www.google.com:80. (org.apache.kafka.clients.NetworkClient) > [2017-01-18 10:03:03,915] INFO Revoking previously assigned partitions [] for > group console-consumer-64535 > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > [2017-01-18 10:03:03,915] INFO (Re-)joining group console-consumer-64535 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2017-01-18 10:03:03,917] DEBUG Sending JoinGroup ((type: JoinGroupRequest, > groupId=console-consumer-64535, sessionTimeout=10000, > rebalanceTimeout=300000, memberId=, protocolType=consumer, > groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@564fabc8)) > to coordinator www.google.com:80 (id: 2147483647 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2017-01-18 10:03:03,932] DEBUG Created socket with SO_RCVBUF = 66646, > SO_SNDBUF = 131874, SO_TIMEOUT = 0 to node 2147483647 > (org.apache.kafka.common.network.Selector) > [2017-01-18 10:03:03,932] DEBUG Completed connection to node 2147483647. > Fetching API versions. (org.apache.kafka.clients.NetworkClient) > [2017-01-18 10:03:03,932] DEBUG Initiating API versions fetch from node > 2147483647. (org.apache.kafka.clients.NetworkClient) > [2017-01-18 10:03:03,990] ERROR Unknown error when running consumer: > (kafka.tools.ConsoleConsumer$) > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) > at org.apache.kafka.common.network.Selector.poll(Selector.java:303) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:346) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:331) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:300) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1025) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:990) > at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:55) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > {quote} > It seems like the consumer should validate responses better? It could check > that the version is present/valid before trusting the messageset size bytes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)