Hi Jun,

Thanks for pointing out this. Yes, putting serialization/deserialization
code into record does lose some flexibility. Some more thinking, I think
no matter what we do to bind the producer and serializer/deserializer, we
can always to the same thing on Record, i.e. We can also have some
constructor like ProducerRecor<Serializer<K, V>, Deserializer<K, V>>. The
downside of this is that we could potentially have a
serializer/deserializer instance for each record (that's actually the very
reason that I propose to put the code in record). This problem could be
addressed by either using a singleton class or factory for
serializer/deserializer library. But it might be a little bit complicated
and we are not able to enforce that to external library either. So it
seems only make sense if we really want to:
1. Have a single simple producer interface.
AND
2. use a single producer send all type of messages
I'm not sure if these requirement are strong enough to make us take the
complexity of singleton/factory class serializer/deserializer library.

Thanks.

Jiangjie (Becket) Qin

On 12/5/14, 3:16 PM, "Jun Rao" <j...@confluent.io> wrote:

>Jiangjie,
>
>The issue with adding the serializer in ProducerRecord is that you need to
>implement all combinations of serializers for key and value. So, instead
>of
>just implementing int and string serializers, you will have to implement
>all 4 combinations.
>
>Adding a new producer constructor like Producer<K, V>(KeySerializer<K>,
>ValueSerializer<V>, Properties properties) can be useful.
>
>Thanks,
>
>Jun
>
>On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin <j...@linkedin.com.invalid>
>wrote:
>
>>
>> I'm just thinking instead of binding serialization with producer,
>>another
>> option is to bind serializer/deserializer with
>> ProducerRecord/ConsumerRecord (please see the detail proposal below.)
>>            The arguments for this option is:
>>         A. A single producer could send different message types. There
>>are
>> several use cases in LinkedIn for per record serializer
>>         - In Samza, there are some in-stream order-sensitive control
>> messages
>> having different deserializer from other messages.
>>         - There are use cases which need support for sending both Avro
>> messages
>> and raw bytes.
>>         - Some use cases needs to deserialize some Avro messages into
>> generic
>> record and some other messages into specific record.
>>         B. In current proposal, the serializer/deserilizer is
>>instantiated
>> according to config. Compared with that, binding serializer with
>> ProducerRecord and ConsumerRecord is less error prone.
>>
>>
>>         This option includes the following changes:
>>         A. Add serializer and deserializer interfaces to replace
>>serializer
>> instance from config.
>>                 Public interface Serializer <K, V> {
>>                         public byte[] serializeKey(K key);
>>                         public byte[] serializeValue(V value);
>>                 }
>>                 Public interface deserializer <K, V> {
>>                         Public K deserializeKey(byte[] key);
>>                         public V deserializeValue(byte[] value);
>>                 }
>>
>>         B. Make ProducerRecord and ConsumerRecord abstract class
>> implementing
>> Serializer <K, V> and Deserializer <K, V> respectively.
>>                 Public abstract class ProducerRecord <K, V> implements
>> Serializer <K, V>
>> {...}
>>                 Public abstract class ConsumerRecord <K, V> implements
>> Deserializer <K,
>> V> {...}
>>
>>         C. Instead of instantiate the serializer/Deserializer from
>>config,
>> let
>> concrete ProducerRecord/ConsumerRecord extends the abstract class and
>> override the serialize/deserialize methods.
>>
>>                 Public class AvroProducerRecord extends ProducerRecord
>> <String,
>> GenericRecord> {
>>                         ...
>>                         @Override
>>                         Public byte[] serializeKey(String key) {Š}
>>                         @Override
>>                         public byte[] serializeValue(GenericRecord
>>value);
>>                 }
>>
>>                 Public class AvroConsumerRecord extends ConsumerRecord
>> <String,
>> GenericRecord> {
>>                         ...
>>                         @Override
>>                         Public K deserializeKey(byte[] key) {Š}
>>                         @Override
>>                         public V deserializeValue(byte[] value);
>>                 }
>>
>>         D. The producer API changes to
>>                 Public class KafkaProducer {
>>                         ...
>>
>>                         Future<RecordMetadata> send (ProducerRecord <K,
>>V>
>> record) {
>>                                 ...
>>                                 K key = record.serializeKey(record.key);
>>                                 V value =
>> record.serializedValue(record.value);
>>                                 BytesProducerRecord bytesProducerRecord
>>=
>> new
>> BytesProducerRecord(topic, partition, key, value);
>>                                 ...
>>                         }
>>                         ...
>>                 }
>>
>>
>>
>> We also had some brainstorm in LinkedIn and here are the feedbacks:
>>
>> If the community decide to add the serialization back to new producer,
>> besides current proposal which changes new producer API to be a
>>template,
>> there are some other options raised during our discussion:
>>         1) Rather than change current new producer API, we can provide a
>> wrapper
>> of current new producer (e.g. KafkaSerializedProducer) and make it
>> available to users. As there is value in the simplicity of current API.
>>
>>         2) If we decide to go with tempalated new producer API,
>>according
>> to
>> experience in LinkedIn, it might worth considering to instantiate the
>> serializer in code instead of from config so we can avoid runtime errors
>> due to dynamic instantiation from config, which is more error prone. If
>> that is the case, the producer API could be changed to something like:
>>                 producer = new Producer<K, V>(KeySerializer<K>,
>> ValueSerializer<V>)
>>
>> --Jiangjie (Becket) Qin
>>
>>
>> On 11/24/14, 5:58 PM, "Jun Rao" <jun...@gmail.com> wrote:
>>
>> >Hi, Everyone,
>> >
>> >I'd like to start a discussion on whether it makes sense to add the
>> >serializer api back to the new java producer. Currently, the new java
>> >producer takes a byte array for both the key and the value. While this
>>api
>> >is simple, it pushes the serialization logic into the application. This
>> >makes it hard to reason about what type of data is being sent to Kafka
>>and
>> >also makes it hard to share an implementation of the serializer. For
>> >example, to support Avro, the serialization logic could be quite
>>involved
>> >since it might need to register the Avro schema in some remote registry
>> >and
>> >maintain a schema cache locally, etc. Without a serialization api, it's
>> >impossible to share such an implementation so that people can easily
>> >reuse.
>> >We sort of overlooked this implication during the initial discussion of
>> >the
>> >producer api.
>> >
>> >So, I'd like to propose an api change to the new producer by adding
>>back
>> >the serializer api similar to what we had in the old producer.
>>Specially,
>> >the proposed api changes are the following.
>> >
>> >First, we change KafkaProducer to take generic types K and V for the
>>key
>> >and the value, respectively.
>> >
>> >public class KafkaProducer<K,V> implements Producer<K,V> {
>> >
>> >    public Future<RecordMetadata> send(ProducerRecord<K,V> record,
>> >Callback
>> >callback);
>> >
>> >    public Future<RecordMetadata> send(ProducerRecord<K,V> record);
>> >}
>> >
>> >Second, we add two new configs, one for the key serializer and another
>>for
>> >the value serializer. Both serializers will default to the byte array
>> >implementation.
>> >
>> >public class ProducerConfig extends AbstractConfig {
>> >
>> >    .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
>> >"org.apache.kafka.clients.producer.ByteArraySerializer",
>>Importance.HIGH,
>> >KEY_SERIALIZER_CLASS_DOC)
>> >    .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
>> >"org.apache.kafka.clients.producer.ByteArraySerializer",
>>Importance.HIGH,
>> >VALUE_SERIALIZER_CLASS_DOC);
>> >}
>> >
>> >Both serializers will implement the following interface.
>> >
>> >public interface Serializer<T> extends Configurable {
>> >    public byte[] serialize(String topic, T data, boolean isKey);
>> >
>> >    public void close();
>> >}
>> >
>> >This is more or less the same as what's in the old producer. The slight
>> >differences are (1) the serializer now only requires a parameter-less
>> >constructor; (2) the serializer has a configure() and a close() method
>>for
>> >initialization and cleanup, respectively; (3) the serialize() method
>> >additionally takes the topic and an isKey indicator, both of which are
>> >useful for things like schema registration.
>> >
>> >The detailed changes are included in KAFKA-1797. For completeness, I
>>also
>> >made the corresponding changes for the new java consumer api as well.
>> >
>> >Note that the proposed api changes are incompatible with what's in the
>> >0.8.2 branch. However, if those api changes are beneficial, it's
>>probably
>> >better to include them now in the 0.8.2 release, rather than later.
>> >
>> >I'd like to discuss mainly two things in this thread.
>> >1. Do people feel that the proposed api changes are reasonable?
>> >2. Are there any concerns of including the api changes in the 0.8.2
>>final
>> >release?
>> >
>> >Thanks,
>> >
>> >Jun
>>
>>

Reply via email to