Hi Pushkar,

On 7/25/19 10:51 AM, Pushkar Deole wrote:
Hi All,

I am new to Kafka and still getting myself acquainted with the product. I
have a basic question around using Kafka. I want to store in a Kafka topic,
a string value against some keys while a HashMap value against some of the
keys. For this purpose, I have created two different producers as below
which I instantiate with two different producer instances. Note that I need
to create two different producers since I want to use generic types
properly, else with a single producer if I want to use the same producer to
store a String and Map then I will need to use <String, Object> in the
generic types and Object is too generic which I don't want to allow, so
defined two different producers...

private static Producer<String, String> basicProducer = null;
private static Producer<String, Map> hashProducer = null;

Now, if I want to use other streams classes such as KTable or GlobalKTable
from where I would read my data, then these classes also requires to define
generic types e.g. if I define a GlobalKTable like below then it won't work
for HashMap stored against key:

private static GlobalKTable<String, String> eventsGTable;

So, in order to allow a String as well as Map stored against key in a
topic, how should I go about it? Do I need to use <String, Object> as
generic types in all definitions and then cast from Object to String or Map
as per the type of instance stored in the object?


If you want to store String(s) as well as Map(s) into values of a single topic and still be type-safe, I recommend introducing an indirection in the form of an "Either" class. For example:


import java.util.Objects;

public final class Either<FST, SND> {

    public static <FST, SND> Either<FST, SND> first(FST first) {
        return new Either<>(Objects.requireNonNull(first), null);
    }

    public static <FST, SND> Either<FST, SND> second(SND second) {
        return new Either<>(null, Objects.requireNonNull(second));
    }

    private final FST fst;
    private final SND snd;

    private Either(FST fst, SND snd) {
        this.fst = fst;
        this.snd = snd;
    }

    public boolean isFirst() {
        return fst != null;
    }

    public boolean isSecond() {
        return snd != null;
    }

    public FST getFirst() {
        return Objects.requireNonNull(fst);
    }

    public SND getSecond() {
        return Objects.requireNonNull(snd);
    }
}


(BTW, what type are the keys and values in your Map(s)? Let's say for the sake of this example, that you store Map<String, String> instances.

So you can then create a KafkaConsumer<String, Either<String, Map<String, String>>> and KafkaProducer<String, Either<String, Map<String, String>>> consumer/producer instances using custom (de)serialization (Serializer, Deserializer, Serde). Here's what I used in such occasions:

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;

/**
 * A {@link Serde} base implementation that is itself also a {@link Serializer} and a {@link Deserializer}.
 */
public abstract class SelfSerde<T> implements Serde<T>, Serializer<T>, Deserializer<T> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) { }

    @Override
    public void close() { }

    @Override
    public Serializer<T> serializer() {
        return this;
    }

    @Override
    public Deserializer<T> deserializer() {
        return this;
    }

    /**
     * A {@link SelfSerde} implementation that serializes {@link Void} type
     * which has the only possible value {@code null}.
     */
    public static final SelfSerde<Void> VOID = new SelfSerde<Void>() {
        @Override
        public Void deserialize(String topic, byte[] data) {
            assert data == null;
            return null;
        }

        @Override
        public byte[] serialize(String topic, Void data) {
            return null;
        }
    };

    /**
     * A {@link SelfSerde} base implementation that delegates (de)serializing to methods using
     * {@link DataInput} and {@link DataOutput} API.
     */
    public static abstract class DataIO<T> extends SelfSerde<T> {
        @Override
        public final T deserialize(String topic, byte[] data) {
            if (data == null) return null;
            try {
                return deserialize(topic, data, new DataInputStream(new ByteArrayInputStream(data)));
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        @Override
        public final byte[] serialize(String topic, T data) {
            if (data == null) return null;
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                serialize(topic, data, new DataOutputStream(baos));
                return baos.toByteArray();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        protected abstract T deserialize(String topic, byte[] data, DataInput in) throws IOException;

        protected abstract void serialize(String topic, T data, DataOutput out) throws IOException;

        protected static <T> void write(String topic, DataOutput out, T value, Serde<T> serde) throws IOException {
            byte[] bytes = serde.serializer().serialize(topic, value);
            if (bytes == null) {
                out.writeInt(-1);
            } else {
                out.writeInt(bytes.length);
                out.write(bytes);
            }
        }

        protected static <T> T read(String topic, DataInput in, Serde<T> serde) throws IOException {
            int size = in.readInt();
            byte[] bytes;
            if (size < 0) {
                bytes = null;
            } else {
                in.readFully(bytes = new byte[size]);
            }
            return serde.deserializer().deserialize(topic, bytes);
        }
    }

    static abstract class MapSerde<K, V> extends SelfSerde.DataIO<Map<K, V>> {

        final Serde<K> keySerde;
        final Serde<V> valSerde;

        MapSerde(Serde<K> keySerde, Serde<V> valSerde) {
            this.keySerde = Objects.requireNonNull(keySerde);
            this.valSerde = Objects.requireNonNull(valSerde);
        }

        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            keySerde.configure(configs, isKey);
            valSerde.configure(configs, isKey);
        }

        @Override
        public void close() {
            keySerde.close();
            valSerde.close();
        }

        @Override
        protected void serialize(String topic, Map<K, V> map, DataOutput out) throws IOException {
            try {
                int[] size = new int[1];
                map.forEach((k, v) -> {
                    try {
                        write(topic, out, k, keySerde);
                        write(topic, out, v, valSerde);
                        size[0]++;
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                });
                out.writeInt(size[0]);
            } catch (UncheckedIOException e) {
                throw e.getCause();
            }
        }
    }

    public static class HashMap<K, V> extends MapSerde<K, V> {

        public HashMap(Serde<K> keySerde, Serde<V> valSerde) {
            super(keySerde, valSerde);
        }

        @Override
        protected Map<K, V> deserialize(String topic, byte[] data, DataInput in) throws IOException {             ByteBuffer bb = ByteBuffer.wrap(data, data.length - Integer.BYTES, Integer.BYTES);
            int size = bb.getInt();
            Map<K, V> map = new java.util.HashMap<>((size * 4 + 2) / 3);
            for (int i = 0; i < size; i++) {
                map.put(read(topic, in, keySerde), read(topic, in, valSerde));
            }
            return map;
        }
    }
}


Here's also a Serde implementation for Either<FST, SND> instances:


import org.apache.kafka.common.serialization.Serde;

import java.io.StreamCorruptedException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

public final class EitherSerde<FST, SND> extends SelfSerde<Either<FST, SND>> {
    private final Serde<FST> fstSerde;
    private final Serde<SND> sndSerde;

    public EitherSerde(Serde<FST> fstSerde,
                       Serde<SND> sndSerde) {
        this.fstSerde = Objects.requireNonNull(fstSerde);
        this.sndSerde = Objects.requireNonNull(sndSerde);
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        fstSerde.configure(configs, isKey);
        sndSerde.configure(configs, isKey);
    }

    @Override
    public void close() {
        fstSerde.close();
        sndSerde.close();
    }

    @Override
    public Either<FST, SND> deserialize(String topic, byte[] data) {
        if (data == null) return null;
        byte[] elementData = Arrays.copyOfRange(data, 0, data.length - 1);
        byte id = data[data.length - 1];
        switch (id) {
            case 0:
                return Either.first(fstSerde.deserializer().deserialize(topic, elementData));
            case 1:
                return Either.second(sndSerde.deserializer().deserialize(topic, elementData));
            default:
                throw new UncheckedIOException(new StreamCorruptedException("Expected 0 or 1 as last byte of Either"));
        }
    }

    @Override
    public byte[] serialize(String topic, Either<FST, SND> oneof2) {
        if (oneof2 == null) return null;
        byte[] elementData;
        byte id;
        if (oneof2.isFirst()) {
            elementData = fstSerde.serializer().serialize(topic, oneof2.getFirst());
            id = 0;
        } else {// oneof2.isSecond()
            elementData = sndSerde.serializer().serialize(topic, oneof2.getSecond());
            id = 1;
        }
        byte[] data = Arrays.copyOf(elementData, elementData.length + 1);
        data[data.length - 1] = id;
        return data;
    }
}



You could then use above utilities in the following way:


        SelfSerde<Either<String, Map<String, String>>> valSerde =
            new EitherSerde<>(
                Serdes.String(),
                new SelfSerde.HashMap<>(Serdes.String(), Serdes.String())
            );

        Properties props = ...;

        Consumer<String, Either<String, Map<String, String>>> consumer =
            new KafkaConsumer<>(
                props,
                Serdes.String().deserializer(),
                valSerde
            );

// or:

        Properties props = ...;

        Producer<String, Either<String, Map<String, String>>> producer =
            new KafkaProducer<>(
                props,
                Serdes.String().serializer(),
                valSerde
            );



With custom (de)serialization you achieve compact (small) messages and low CPU usage which you can't if you use Java default serialization. And you can still enjoy type-safe code :-)


Regards, Peter

Reply via email to