We compress a batch of messages together, but we need to give each message its own offset (and know its key if we want to use topic compaction), so messages are un-compressed and re-compressed.
We are working on an improvement to add relative offsets which will allow the broker to skip this re-compression. Gwen On Tue, Oct 20, 2015 at 3:18 AM, Jérôme BAROTIN <jer...@barotin.fr> wrote: > Hi, > > I was 100% sure that Kafka broker didn't compress data and I didn't > think that I had to upgrade my broker to 8.2.2. > > I tried the upgrade and It works right now! > > I still don't understand, why the broker need to compress data again > (if the data compression is already done in the producer). Have you a > link for wiki, documentation or other to share about that? > > Anyway, thanks for your help to solve this mistake. > > Regards, > > Jérôme > > 2015-10-20 1:26 GMT+02:00 Jun Rao <j...@confluent.io>: >> You will need to upgrade the broker to 0.8.2.2. Broker currently >> recompresses messages. In 0.8.2.1, the snappy jar has a bug that causes >> data explosion. We fixed the snappy jar in 0.8.2.2. If you upgrade the >> broker to 0.8.2.2, it will pick up the fixed snappy jar. >> >> Thanks, >> >> Jun >> >> On Sat, Oct 17, 2015 at 1:21 AM, Jérôme BAROTIN <jer...@barotin.fr> wrote: >> >>> Hello, >>> >>> I want to check if the snappy compression works well with the java Kafka >>> client. >>> >>> In order to handle this, I set up a small program. This program >>> generate 1024 messages of readable data. Their size are of 1024 bytes >>> each. I send these messages on tree new topics and after I check the >>> size of these topic directly on the broker filesystem. >>> >>> You can find this program through the following java code : >>> >>> package unit_test.testCompress; >>> >>> import java.util.HashMap; >>> import java.util.Map; >>> import java.util.Random; >>> import java.util.concurrent.Future; >>> >>> import org.apache.kafka.clients.producer.KafkaProducer; >>> import org.apache.kafka.clients.producer.ProducerRecord; >>> import org.apache.kafka.clients.producer.RecordMetadata; >>> >>> >>> /** >>> * Can be use in order to execute some unit test on compression >>> */ >>> public class TestCompress { >>> >>> public static void compress(String type, String version){ >>> Map<String,Object> configs = new HashMap<String,Object>(); >>> configs.put("key.serializer", >>> "org.apache.kafka.common.serialization.StringSerializer"); >>> configs.put("producer.type", "async"); >>> configs.put("compression.type", type); >>> configs.put("value.serializer", >>> "org.apache.kafka.common.serialization.ByteArraySerializer"); >>> configs.put("partitioner.class", >>> "com.astellia.astkafkaproducer.RecordPartitioner"); >>> configs.put("bootstrap.servers", "kafka:9092"); >>> >>> >>> KafkaProducer<String, byte[]> producer = new >>> KafkaProducer<String, byte[]>(configs); >>> >>> Random r = new Random(15415485); >>> int size = 1024; //1 Ko >>> byte[] buffer = new byte[size]; >>> for(int i = 0; i < size; i++){ >>> buffer[i] = (byte) ('A' + (r.nextInt() % 26)); >>> } >>> buffer[size-1] = 0; >>> //System.out.println(new String(buffer)); >>> for(int i = 0; i < size; i++ ){ >>> Future<RecordMetadata> result = producer.send( new >>> ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" + >>> type , buffer)); >>> } >>> >>> producer.close(); >>> } >>> >>> public static void main(String[] args) { >>> >>> String version = "v10"; >>> compress("snappy",version); >>> compress("gzip",version); >>> compress("none",version); >>> >>> } >>> >>> } >>> >>> >>> I'm compiling this code with this following maven pom file : >>> >>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >>> <modelVersion>4.0.0</modelVersion> >>> >>> <groupId>unit_test</groupId> >>> <artifactId>testCompress</artifactId> >>> <version>0.0.1-SNAPSHOT</version> >>> <packaging>jar</packaging> >>> >>> <name>testCompress</name> >>> <url>http://maven.apache.org</url> >>> >>> <properties> >>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >>> </properties> >>> >>> <dependencies> >>> <dependency> >>> <groupId>org.apache.kafka</groupId> >>> <artifactId>kafka_2.10</artifactId> >>> <version>0.8.2.2</version> >>> </dependency> >>> </dependencies> >>> </project> >>> >>> This program executes very well on my computer. >>> >>> But when I check the results directly on my kafka broker through the >>> command line tool "du" the space took by each topics. I found : >>> - gzip topic is compressed that's ok >>> - none topic is not compressed that's ok >>> - but snappy topic is not compressed, that's not ok >>> (screenshot can be found here : http://i.stack.imgur.com/7W1f5.png) >>> >>> >>> I checked though vi the stored file and data are still clear. >>> >>> I'm aware about this issue on Kafka 8.2.1 : >>> https://issues.apache.org/jira/browse/KAFKA-2189 >>> >>> But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker. >>> >>> I checked the dependency of Snappy as well. I'm using the 1.1.1.7 >>> >>> Have you an idea of how to enable snappy compression on Kafak ? >>> Did I forget a parameter to enable snappy compression on kafka ? >>> Are my kafka version not compatible ? >>>