[ https://issues.apache.org/jira/browse/KAFKA-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17765597#comment-17765597 ]
Vikash Mishra edited comment on KAFKA-15160 at 9/15/23 12:11 PM: ----------------------------------------------------------------- Hey [~phuctran] , thanks for looking into this: Yes, we did notice this for all ConsumerRecord. For creating ConsumerRecord: # You can create Message of Size say 300kb and proto encoded. # Use compression ( any compression snappy/gzip) # Publish to Kafka Topic. Kafka Listener: # It's simple Java sprint Kafka Listener which uses Kafka listener behind the scene, removed topic and other configs @KafkaListener(topics = "topic-name", containerFactory = CONSUMER_LISTENER_FACTORY, groupId = groupId) public void consume(List<ConsumerRecord<String, byte[]>> records, Acknowledgment ack) { this.consume(records, ack); } I have attached dump as well. Its compressed with 7-zip due to upload size restriction. Please uncompress and use analyze as per choice of tool, we used Eclipse memory analyzer. We used IntelliJIDE as well and there also we noticed same issue. was (Author: JIRAUSER299944): Hey [~phuctran] , thanks for looking into this: Yes, we did notice this for all ConsumerRecord. For creating ConsumerRecord: # You can create Message of Size say 300kb and proto encoded. # Use compression ( any compression snappy/gzip) # Publish to Kafka Topic. Kafka Listener: # It's simple Java sprint Kafka Listener which uses Kafka listener behind the scene, removed topic and other configs @KafkaListener(topics = "topic-name", containerFactory = CONSUMER_LISTENER_FACTORY, groupId = groupId) public void consume(List<ConsumerRecord<String, byte[]>> records, Acknowledgment ack) \{ this.consume(records, ack); } I have attached dump as well. Its compressed with 7-zip due to upload size restriction. Please uncompress and use analyze as per choice of tool, we used Eclipse memory analyzer. We also used IntelliJIDE as well and there also we noticed same issue. > Message bytes duplication in Kafka headers when compression is enabled > ---------------------------------------------------------------------- > > Key: KAFKA-15160 > URL: https://issues.apache.org/jira/browse/KAFKA-15160 > Project: Kafka > Issue Type: Bug > Components: clients, compression, consumer > Affects Versions: 3.2.3, 3.3.2 > Reporter: Vikash Mishra > Assignee: Phuc Hong Tran > Priority: Critical > Attachments: dump-compressed-data-.7z, java heap dump.png, > wireshark-min.png > > > I created a spring Kafka consumer using @KafkaListener. > During this, I encounter a scenario where when data is compressed ( any > compression snappy/gzip) and consumed by the consumer then I see that in a > heap dump, there is a " byte" occupying the same amount of memory as in > Message value. > This behavior is seen only in cases when compressed data is consumed by > consumers not in the case of uncompressed data. > Tried to capture Kafka's message through Wireshark, there it shows the proper > size of data incoming from Kafka server & no extra bytes in headers. So, this > is definitely something in Kafka client. Spring doesn't do any actions about > compression; the whole functionality is done internally in the Kafka client > library. > Attached is the screenshot of the heap dump and Wireshark. > This seems like a critical issue as message size in memory almost gets > doubles impacting consumer memory and performance. Somewhere it feels like > the actual message value is copied to headers? > *To Reproduce* > # Produce compressed data on any topic. > # Create a simple consumer consuming from the above-created topic. > # Capture heap dump. > *Expected behavior* > Headers should not show bytes consuming memory equivalent to value. -- This message was sent by Atlassian Jira (v8.20.10#820010)