Hi Pushkar,
On 7/25/19 10:51 AM, Pushkar Deole wrote:
Hi All,
I am new to Kafka and still getting myself acquainted with the product. I
have a basic question around using Kafka. I want to store in a Kafka topic,
a string value against some keys while a HashMap value against some of the
keys. For this purpose, I have created two different producers as below
which I instantiate with two different producer instances. Note that I need
to create two different producers since I want to use generic types
properly, else with a single producer if I want to use the same producer to
store a String and Map then I will need to use <String, Object> in the
generic types and Object is too generic which I don't want to allow, so
defined two different producers...
private static Producer<String, String> basicProducer = null;
private static Producer<String, Map> hashProducer = null;
Now, if I want to use other streams classes such as KTable or GlobalKTable
from where I would read my data, then these classes also requires to define
generic types e.g. if I define a GlobalKTable like below then it won't work
for HashMap stored against key:
private static GlobalKTable<String, String> eventsGTable;
So, in order to allow a String as well as Map stored against key in a
topic, how should I go about it? Do I need to use <String, Object> as
generic types in all definitions and then cast from Object to String or Map
as per the type of instance stored in the object?
If you want to store String(s) as well as Map(s) into values of a single
topic and still be type-safe, I recommend introducing an indirection in
the form of an "Either" class. For example:
import java.util.Objects;
public final class Either<FST, SND> {
public static <FST, SND> Either<FST, SND> first(FST first) {
return new Either<>(Objects.requireNonNull(first), null);
}
public static <FST, SND> Either<FST, SND> second(SND second) {
return new Either<>(null, Objects.requireNonNull(second));
}
private final FST fst;
private final SND snd;
private Either(FST fst, SND snd) {
this.fst = fst;
this.snd = snd;
}
public boolean isFirst() {
return fst != null;
}
public boolean isSecond() {
return snd != null;
}
public FST getFirst() {
return Objects.requireNonNull(fst);
}
public SND getSecond() {
return Objects.requireNonNull(snd);
}
}
(BTW, what type are the keys and values in your Map(s)? Let's say for
the sake of this example, that you store Map<String, String> instances.
So you can then create a KafkaConsumer<String, Either<String,
Map<String, String>>> and KafkaProducer<String, Either<String,
Map<String, String>>> consumer/producer instances using custom
(de)serialization (Serializer, Deserializer, Serde). Here's what I used
in such occasions:
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
/**
* A {@link Serde} base implementation that is itself also a {@link
Serializer} and a {@link Deserializer}.
*/
public abstract class SelfSerde<T> implements Serde<T>, Serializer<T>,
Deserializer<T> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) { }
@Override
public void close() { }
@Override
public Serializer<T> serializer() {
return this;
}
@Override
public Deserializer<T> deserializer() {
return this;
}
/**
* A {@link SelfSerde} implementation that serializes {@link Void} type
* which has the only possible value {@code null}.
*/
public static final SelfSerde<Void> VOID = new SelfSerde<Void>() {
@Override
public Void deserialize(String topic, byte[] data) {
assert data == null;
return null;
}
@Override
public byte[] serialize(String topic, Void data) {
return null;
}
};
/**
* A {@link SelfSerde} base implementation that delegates
(de)serializing to methods using
* {@link DataInput} and {@link DataOutput} API.
*/
public static abstract class DataIO<T> extends SelfSerde<T> {
@Override
public final T deserialize(String topic, byte[] data) {
if (data == null) return null;
try {
return deserialize(topic, data, new DataInputStream(new
ByteArrayInputStream(data)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public final byte[] serialize(String topic, T data) {
if (data == null) return null;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serialize(topic, data, new DataOutputStream(baos));
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
protected abstract T deserialize(String topic, byte[] data,
DataInput in) throws IOException;
protected abstract void serialize(String topic, T data,
DataOutput out) throws IOException;
protected static <T> void write(String topic, DataOutput out, T
value, Serde<T> serde) throws IOException {
byte[] bytes = serde.serializer().serialize(topic, value);
if (bytes == null) {
out.writeInt(-1);
} else {
out.writeInt(bytes.length);
out.write(bytes);
}
}
protected static <T> T read(String topic, DataInput in,
Serde<T> serde) throws IOException {
int size = in.readInt();
byte[] bytes;
if (size < 0) {
bytes = null;
} else {
in.readFully(bytes = new byte[size]);
}
return serde.deserializer().deserialize(topic, bytes);
}
}
static abstract class MapSerde<K, V> extends
SelfSerde.DataIO<Map<K, V>> {
final Serde<K> keySerde;
final Serde<V> valSerde;
MapSerde(Serde<K> keySerde, Serde<V> valSerde) {
this.keySerde = Objects.requireNonNull(keySerde);
this.valSerde = Objects.requireNonNull(valSerde);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
keySerde.configure(configs, isKey);
valSerde.configure(configs, isKey);
}
@Override
public void close() {
keySerde.close();
valSerde.close();
}
@Override
protected void serialize(String topic, Map<K, V> map,
DataOutput out) throws IOException {
try {
int[] size = new int[1];
map.forEach((k, v) -> {
try {
write(topic, out, k, keySerde);
write(topic, out, v, valSerde);
size[0]++;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
out.writeInt(size[0]);
} catch (UncheckedIOException e) {
throw e.getCause();
}
}
}
public static class HashMap<K, V> extends MapSerde<K, V> {
public HashMap(Serde<K> keySerde, Serde<V> valSerde) {
super(keySerde, valSerde);
}
@Override
protected Map<K, V> deserialize(String topic, byte[] data,
DataInput in) throws IOException {
ByteBuffer bb = ByteBuffer.wrap(data, data.length -
Integer.BYTES, Integer.BYTES);
int size = bb.getInt();
Map<K, V> map = new java.util.HashMap<>((size * 4 + 2) / 3);
for (int i = 0; i < size; i++) {
map.put(read(topic, in, keySerde), read(topic, in,
valSerde));
}
return map;
}
}
}
Here's also a Serde implementation for Either<FST, SND> instances:
import org.apache.kafka.common.serialization.Serde;
import java.io.StreamCorruptedException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
public final class EitherSerde<FST, SND> extends SelfSerde<Either<FST,
SND>> {
private final Serde<FST> fstSerde;
private final Serde<SND> sndSerde;
public EitherSerde(Serde<FST> fstSerde,
Serde<SND> sndSerde) {
this.fstSerde = Objects.requireNonNull(fstSerde);
this.sndSerde = Objects.requireNonNull(sndSerde);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
fstSerde.configure(configs, isKey);
sndSerde.configure(configs, isKey);
}
@Override
public void close() {
fstSerde.close();
sndSerde.close();
}
@Override
public Either<FST, SND> deserialize(String topic, byte[] data) {
if (data == null) return null;
byte[] elementData = Arrays.copyOfRange(data, 0, data.length - 1);
byte id = data[data.length - 1];
switch (id) {
case 0:
return
Either.first(fstSerde.deserializer().deserialize(topic, elementData));
case 1:
return
Either.second(sndSerde.deserializer().deserialize(topic, elementData));
default:
throw new UncheckedIOException(new
StreamCorruptedException("Expected 0 or 1 as last byte of Either"));
}
}
@Override
public byte[] serialize(String topic, Either<FST, SND> oneof2) {
if (oneof2 == null) return null;
byte[] elementData;
byte id;
if (oneof2.isFirst()) {
elementData = fstSerde.serializer().serialize(topic,
oneof2.getFirst());
id = 0;
} else {// oneof2.isSecond()
elementData = sndSerde.serializer().serialize(topic,
oneof2.getSecond());
id = 1;
}
byte[] data = Arrays.copyOf(elementData, elementData.length + 1);
data[data.length - 1] = id;
return data;
}
}
You could then use above utilities in the following way:
SelfSerde<Either<String, Map<String, String>>> valSerde =
new EitherSerde<>(
Serdes.String(),
new SelfSerde.HashMap<>(Serdes.String(), Serdes.String())
);
Properties props = ...;
Consumer<String, Either<String, Map<String, String>>> consumer =
new KafkaConsumer<>(
props,
Serdes.String().deserializer(),
valSerde
);
// or:
Properties props = ...;
Producer<String, Either<String, Map<String, String>>> producer =
new KafkaProducer<>(
props,
Serdes.String().serializer(),
valSerde
);
With custom (de)serialization you achieve compact (small) messages and
low CPU usage which you can't if you use Java default serialization. And
you can still enjoy type-safe code :-)
Regards, Peter