We compress a batch of messages together, but we need to give each
message its own offset (and know its key if we want to use topic
compaction), so messages are un-compressed and re-compressed.

We are working on an improvement to add relative offsets which will
allow the broker to skip this re-compression.

Gwen

On Tue, Oct 20, 2015 at 3:18 AM, Jérôme BAROTIN <jer...@barotin.fr> wrote:
> Hi,
>
> I was 100% sure that Kafka broker didn't compress data and I didn't
> think that I had to upgrade my broker to 8.2.2.
>
> I tried the upgrade and It works right now!
>
> I still don't understand, why the broker need to compress data again
> (if the data compression is already done in the producer). Have you a
> link for wiki, documentation or other to share about that?
>
> Anyway, thanks for your help to solve this mistake.
>
> Regards,
>
> Jérôme
>
> 2015-10-20 1:26 GMT+02:00 Jun Rao <j...@confluent.io>:
>> You will need to upgrade the broker to 0.8.2.2. Broker currently
>> recompresses messages. In 0.8.2.1, the snappy jar has a bug that causes
>> data explosion. We fixed the snappy jar in 0.8.2.2. If you upgrade the
>> broker to 0.8.2.2, it will pick up the fixed snappy jar.
>>
>> Thanks,
>>
>> Jun
>>
>> On Sat, Oct 17, 2015 at 1:21 AM, Jérôme BAROTIN <jer...@barotin.fr> wrote:
>>
>>> Hello,
>>>
>>> I want to check if the snappy compression works well with the java Kafka
>>> client.
>>>
>>> In order to handle this, I set up a small program. This program
>>> generate 1024 messages of readable data. Their size are of 1024 bytes
>>> each. I send these messages on tree new topics and after I check the
>>> size of these topic directly on the broker filesystem.
>>>
>>> You can find this program through the following java code :
>>>
>>>     package unit_test.testCompress;
>>>
>>>     import java.util.HashMap;
>>>     import java.util.Map;
>>>     import java.util.Random;
>>>     import java.util.concurrent.Future;
>>>
>>>     import org.apache.kafka.clients.producer.KafkaProducer;
>>>     import org.apache.kafka.clients.producer.ProducerRecord;
>>>     import org.apache.kafka.clients.producer.RecordMetadata;
>>>
>>>
>>>     /**
>>>      * Can be use in order to execute some unit test on compression
>>>      */
>>>     public class TestCompress {
>>>
>>>         public static void compress(String type, String version){
>>>             Map<String,Object> configs = new HashMap<String,Object>();
>>>             configs.put("key.serializer",
>>> "org.apache.kafka.common.serialization.StringSerializer");
>>>             configs.put("producer.type", "async");
>>>             configs.put("compression.type", type);
>>>             configs.put("value.serializer",
>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>>             configs.put("partitioner.class",
>>> "com.astellia.astkafkaproducer.RecordPartitioner");
>>>             configs.put("bootstrap.servers", "kafka:9092");
>>>
>>>
>>>             KafkaProducer<String, byte[]> producer = new
>>> KafkaProducer<String, byte[]>(configs);
>>>
>>>             Random r = new Random(15415485);
>>>             int size = 1024; //1 Ko
>>>             byte[] buffer = new byte[size];
>>>             for(int i = 0; i < size; i++){
>>>                 buffer[i] = (byte) ('A' + (r.nextInt() % 26));
>>>             }
>>>             buffer[size-1] = 0;
>>>             //System.out.println(new String(buffer));
>>>             for(int i = 0; i < size; i++ ){
>>>                 Future<RecordMetadata> result = producer.send( new
>>> ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" +
>>> type , buffer));
>>>             }
>>>
>>>             producer.close();
>>>         }
>>>
>>>         public static void main(String[] args) {
>>>
>>>             String version = "v10";
>>>             compress("snappy",version);
>>>             compress("gzip",version);
>>>             compress("none",version);
>>>
>>>         }
>>>
>>>     }
>>>
>>>
>>> I'm compiling this code with this following maven pom file :
>>>
>>>         <project xmlns="http://maven.apache.org/POM/4.0.0";
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>>       <modelVersion>4.0.0</modelVersion>
>>>
>>>       <groupId>unit_test</groupId>
>>>       <artifactId>testCompress</artifactId>
>>>       <version>0.0.1-SNAPSHOT</version>
>>>       <packaging>jar</packaging>
>>>
>>>       <name>testCompress</name>
>>>       <url>http://maven.apache.org</url>
>>>
>>>       <properties>
>>>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>       </properties>
>>>
>>>       <dependencies>
>>>          <dependency>
>>>             <groupId>org.apache.kafka</groupId>
>>>             <artifactId>kafka_2.10</artifactId>
>>>             <version>0.8.2.2</version>
>>>         </dependency>
>>>       </dependencies>
>>>     </project>
>>>
>>> This program executes very well on my computer.
>>>
>>> But when I check the results directly on my kafka broker through the
>>> command line tool "du" the space took by each topics. I found  :
>>> - gzip topic is compressed that's ok
>>> - none topic is not compressed that's ok
>>> - but snappy topic is not compressed, that's not ok
>>> (screenshot can be found here : http://i.stack.imgur.com/7W1f5.png)
>>>
>>>
>>> I checked though vi the stored file and data are still clear.
>>>
>>> I'm aware about this issue on Kafka 8.2.1 :
>>> https://issues.apache.org/jira/browse/KAFKA-2189
>>>
>>> But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker.
>>>
>>> I checked the dependency of Snappy as well. I'm using the 1.1.1.7
>>>
>>> Have you an idea of how to enable snappy compression on Kafak ?
>>> Did I forget a parameter to enable snappy compression on kafka ?
>>> Are my kafka version not compatible ?
>>>

Reply via email to