Sounds similar to my experience right now. We have 110GB of JSON data that I
convert to AVRO and upload to Kafka 0.8.2.2 using the 0.8.2.2 Java producer
with snappy compression. The entire logs in Kafka end up 53GB in size, which
is way bigger than I'd expect. I haven't had time to dig into this problem
yet.
Lukas
-----Original Message-----
From: Jérôme BAROTIN
Sent: Saturday, October 17, 2015 1:21 AM
To: users@kafka.apache.org
Subject: Kafka 8.2.2 doesn't want to compress in snappy
Hello,
I want to check if the snappy compression works well with the java Kafka
client.
In order to handle this, I set up a small program. This program
generate 1024 messages of readable data. Their size are of 1024 bytes
each. I send these messages on tree new topics and after I check the
size of these topic directly on the broker filesystem.
You can find this program through the following java code :
package unit_test.testCompress;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* Can be use in order to execute some unit test on compression
*/
public class TestCompress {
public static void compress(String type, String version){
Map<String,Object> configs = new HashMap<String,Object>();
configs.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
configs.put("producer.type", "async");
configs.put("compression.type", type);
configs.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
configs.put("partitioner.class",
"com.astellia.astkafkaproducer.RecordPartitioner");
configs.put("bootstrap.servers", "kafka:9092");
KafkaProducer<String, byte[]> producer = new
KafkaProducer<String, byte[]>(configs);
Random r = new Random(15415485);
int size = 1024; //1 Ko
byte[] buffer = new byte[size];
for(int i = 0; i < size; i++){
buffer[i] = (byte) ('A' + (r.nextInt() % 26));
}
buffer[size-1] = 0;
//System.out.println(new String(buffer));
for(int i = 0; i < size; i++ ){
Future<RecordMetadata> result = producer.send( new
ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" +
type , buffer));
}
producer.close();
}
public static void main(String[] args) {
String version = "v10";
compress("snappy",version);
compress("gzip",version);
compress("none",version);
}
}
I'm compiling this code with this following maven pom file :
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>unit_test</groupId>
<artifactId>testCompress</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>testCompress</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
</dependencies>
</project>
This program executes very well on my computer.
But when I check the results directly on my kafka broker through the
command line tool "du" the space took by each topics. I found :
- gzip topic is compressed that's ok
- none topic is not compressed that's ok
- but snappy topic is not compressed, that's not ok
(screenshot can be found here : http://i.stack.imgur.com/7W1f5.png)
I checked though vi the stored file and data are still clear.
I'm aware about this issue on Kafka 8.2.1 :
https://issues.apache.org/jira/browse/KAFKA-2189
But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker.
I checked the dependency of Snappy as well. I'm using the 1.1.1.7
Have you an idea of how to enable snappy compression on Kafak ?
Did I forget a parameter to enable snappy compression on kafka ?
Are my kafka version not compatible ?