You will need to upgrade the broker to 0.8.2.2. Broker currently
recompresses messages. In 0.8.2.1, the snappy jar has a bug that causes
data explosion. We fixed the snappy jar in 0.8.2.2. If you upgrade the
broker to 0.8.2.2, it will pick up the fixed snappy jar.

Thanks,

Jun

On Sat, Oct 17, 2015 at 1:21 AM, Jérôme BAROTIN <jer...@barotin.fr> wrote:

> Hello,
>
> I want to check if the snappy compression works well with the java Kafka
> client.
>
> In order to handle this, I set up a small program. This program
> generate 1024 messages of readable data. Their size are of 1024 bytes
> each. I send these messages on tree new topics and after I check the
> size of these topic directly on the broker filesystem.
>
> You can find this program through the following java code :
>
>     package unit_test.testCompress;
>
>     import java.util.HashMap;
>     import java.util.Map;
>     import java.util.Random;
>     import java.util.concurrent.Future;
>
>     import org.apache.kafka.clients.producer.KafkaProducer;
>     import org.apache.kafka.clients.producer.ProducerRecord;
>     import org.apache.kafka.clients.producer.RecordMetadata;
>
>
>     /**
>      * Can be use in order to execute some unit test on compression
>      */
>     public class TestCompress {
>
>         public static void compress(String type, String version){
>             Map<String,Object> configs = new HashMap<String,Object>();
>             configs.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>             configs.put("producer.type", "async");
>             configs.put("compression.type", type);
>             configs.put("value.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
>             configs.put("partitioner.class",
> "com.astellia.astkafkaproducer.RecordPartitioner");
>             configs.put("bootstrap.servers", "kafka:9092");
>
>
>             KafkaProducer<String, byte[]> producer = new
> KafkaProducer<String, byte[]>(configs);
>
>             Random r = new Random(15415485);
>             int size = 1024; //1 Ko
>             byte[] buffer = new byte[size];
>             for(int i = 0; i < size; i++){
>                 buffer[i] = (byte) ('A' + (r.nextInt() % 26));
>             }
>             buffer[size-1] = 0;
>             //System.out.println(new String(buffer));
>             for(int i = 0; i < size; i++ ){
>                 Future<RecordMetadata> result = producer.send( new
> ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" +
> type , buffer));
>             }
>
>             producer.close();
>         }
>
>         public static void main(String[] args) {
>
>             String version = "v10";
>             compress("snappy",version);
>             compress("gzip",version);
>             compress("none",version);
>
>         }
>
>     }
>
>
> I'm compiling this code with this following maven pom file :
>
>         <project xmlns="http://maven.apache.org/POM/4.0.0";
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>       <modelVersion>4.0.0</modelVersion>
>
>       <groupId>unit_test</groupId>
>       <artifactId>testCompress</artifactId>
>       <version>0.0.1-SNAPSHOT</version>
>       <packaging>jar</packaging>
>
>       <name>testCompress</name>
>       <url>http://maven.apache.org</url>
>
>       <properties>
>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>       </properties>
>
>       <dependencies>
>          <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.10</artifactId>
>             <version>0.8.2.2</version>
>         </dependency>
>       </dependencies>
>     </project>
>
> This program executes very well on my computer.
>
> But when I check the results directly on my kafka broker through the
> command line tool "du" the space took by each topics. I found  :
> - gzip topic is compressed that's ok
> - none topic is not compressed that's ok
> - but snappy topic is not compressed, that's not ok
> (screenshot can be found here : http://i.stack.imgur.com/7W1f5.png)
>
>
> I checked though vi the stored file and data are still clear.
>
> I'm aware about this issue on Kafka 8.2.1 :
> https://issues.apache.org/jira/browse/KAFKA-2189
>
> But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker.
>
> I checked the dependency of Snappy as well. I'm using the 1.1.1.7
>
> Have you an idea of how to enable snappy compression on Kafak ?
> Did I forget a parameter to enable snappy compression on kafka ?
> Are my kafka version not compatible ?
>

Reply via email to