Hi Jun, Thanks for pointing out this. Yes, putting serialization/deserialization code into record does lose some flexibility. Some more thinking, I think no matter what we do to bind the producer and serializer/deserializer, we can always to the same thing on Record, i.e. We can also have some constructor like ProducerRecor<Serializer<K, V>, Deserializer<K, V>>. The downside of this is that we could potentially have a serializer/deserializer instance for each record (that's actually the very reason that I propose to put the code in record). This problem could be addressed by either using a singleton class or factory for serializer/deserializer library. But it might be a little bit complicated and we are not able to enforce that to external library either. So it seems only make sense if we really want to: 1. Have a single simple producer interface. AND 2. use a single producer send all type of messages
I'm not sure if these requirement are strong enough to make us take the complexity of singleton/factory class serializer/deserializer library. Thanks. Jiangjie (Becket) Qin On 12/5/14, 3:16 PM, "Jun Rao" <j...@confluent.io> wrote: >Jiangjie, > >The issue with adding the serializer in ProducerRecord is that you need to >implement all combinations of serializers for key and value. So, instead >of >just implementing int and string serializers, you will have to implement >all 4 combinations. > >Adding a new producer constructor like Producer<K, V>(KeySerializer<K>, >ValueSerializer<V>, Properties properties) can be useful. > >Thanks, > >Jun > >On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin <j...@linkedin.com.invalid> >wrote: > >> >> I'm just thinking instead of binding serialization with producer, >>another >> option is to bind serializer/deserializer with >> ProducerRecord/ConsumerRecord (please see the detail proposal below.) >> The arguments for this option is: >> A. A single producer could send different message types. There >>are >> several use cases in LinkedIn for per record serializer >> - In Samza, there are some in-stream order-sensitive control >> messages >> having different deserializer from other messages. >> - There are use cases which need support for sending both Avro >> messages >> and raw bytes. >> - Some use cases needs to deserialize some Avro messages into >> generic >> record and some other messages into specific record. >> B. In current proposal, the serializer/deserilizer is >>instantiated >> according to config. Compared with that, binding serializer with >> ProducerRecord and ConsumerRecord is less error prone. >> >> >> This option includes the following changes: >> A. Add serializer and deserializer interfaces to replace >>serializer >> instance from config. >> Public interface Serializer <K, V> { >> public byte[] serializeKey(K key); >> public byte[] serializeValue(V value); >> } >> Public interface deserializer <K, V> { >> Public K deserializeKey(byte[] key); >> public V deserializeValue(byte[] value); >> } >> >> B. Make ProducerRecord and ConsumerRecord abstract class >> implementing >> Serializer <K, V> and Deserializer <K, V> respectively. >> Public abstract class ProducerRecord <K, V> implements >> Serializer <K, V> >> {...} >> Public abstract class ConsumerRecord <K, V> implements >> Deserializer <K, >> V> {...} >> >> C. Instead of instantiate the serializer/Deserializer from >>config, >> let >> concrete ProducerRecord/ConsumerRecord extends the abstract class and >> override the serialize/deserialize methods. >> >> Public class AvroProducerRecord extends ProducerRecord >> <String, >> GenericRecord> { >> ... >> @Override >> Public byte[] serializeKey(String key) {Š} >> @Override >> public byte[] serializeValue(GenericRecord >>value); >> } >> >> Public class AvroConsumerRecord extends ConsumerRecord >> <String, >> GenericRecord> { >> ... >> @Override >> Public K deserializeKey(byte[] key) {Š} >> @Override >> public V deserializeValue(byte[] value); >> } >> >> D. The producer API changes to >> Public class KafkaProducer { >> ... >> >> Future<RecordMetadata> send (ProducerRecord <K, >>V> >> record) { >> ... >> K key = record.serializeKey(record.key); >> V value = >> record.serializedValue(record.value); >> BytesProducerRecord bytesProducerRecord >>= >> new >> BytesProducerRecord(topic, partition, key, value); >> ... >> } >> ... >> } >> >> >> >> We also had some brainstorm in LinkedIn and here are the feedbacks: >> >> If the community decide to add the serialization back to new producer, >> besides current proposal which changes new producer API to be a >>template, >> there are some other options raised during our discussion: >> 1) Rather than change current new producer API, we can provide a >> wrapper >> of current new producer (e.g. KafkaSerializedProducer) and make it >> available to users. As there is value in the simplicity of current API. >> >> 2) If we decide to go with tempalated new producer API, >>according >> to >> experience in LinkedIn, it might worth considering to instantiate the >> serializer in code instead of from config so we can avoid runtime errors >> due to dynamic instantiation from config, which is more error prone. If >> that is the case, the producer API could be changed to something like: >> producer = new Producer<K, V>(KeySerializer<K>, >> ValueSerializer<V>) >> >> --Jiangjie (Becket) Qin >> >> >> On 11/24/14, 5:58 PM, "Jun Rao" <jun...@gmail.com> wrote: >> >> >Hi, Everyone, >> > >> >I'd like to start a discussion on whether it makes sense to add the >> >serializer api back to the new java producer. Currently, the new java >> >producer takes a byte array for both the key and the value. While this >>api >> >is simple, it pushes the serialization logic into the application. This >> >makes it hard to reason about what type of data is being sent to Kafka >>and >> >also makes it hard to share an implementation of the serializer. For >> >example, to support Avro, the serialization logic could be quite >>involved >> >since it might need to register the Avro schema in some remote registry >> >and >> >maintain a schema cache locally, etc. Without a serialization api, it's >> >impossible to share such an implementation so that people can easily >> >reuse. >> >We sort of overlooked this implication during the initial discussion of >> >the >> >producer api. >> > >> >So, I'd like to propose an api change to the new producer by adding >>back >> >the serializer api similar to what we had in the old producer. >>Specially, >> >the proposed api changes are the following. >> > >> >First, we change KafkaProducer to take generic types K and V for the >>key >> >and the value, respectively. >> > >> >public class KafkaProducer<K,V> implements Producer<K,V> { >> > >> > public Future<RecordMetadata> send(ProducerRecord<K,V> record, >> >Callback >> >callback); >> > >> > public Future<RecordMetadata> send(ProducerRecord<K,V> record); >> >} >> > >> >Second, we add two new configs, one for the key serializer and another >>for >> >the value serializer. Both serializers will default to the byte array >> >implementation. >> > >> >public class ProducerConfig extends AbstractConfig { >> > >> > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, >> >"org.apache.kafka.clients.producer.ByteArraySerializer", >>Importance.HIGH, >> >KEY_SERIALIZER_CLASS_DOC) >> > .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, >> >"org.apache.kafka.clients.producer.ByteArraySerializer", >>Importance.HIGH, >> >VALUE_SERIALIZER_CLASS_DOC); >> >} >> > >> >Both serializers will implement the following interface. >> > >> >public interface Serializer<T> extends Configurable { >> > public byte[] serialize(String topic, T data, boolean isKey); >> > >> > public void close(); >> >} >> > >> >This is more or less the same as what's in the old producer. The slight >> >differences are (1) the serializer now only requires a parameter-less >> >constructor; (2) the serializer has a configure() and a close() method >>for >> >initialization and cleanup, respectively; (3) the serialize() method >> >additionally takes the topic and an isKey indicator, both of which are >> >useful for things like schema registration. >> > >> >The detailed changes are included in KAFKA-1797. For completeness, I >>also >> >made the corresponding changes for the new java consumer api as well. >> > >> >Note that the proposed api changes are incompatible with what's in the >> >0.8.2 branch. However, if those api changes are beneficial, it's >>probably >> >better to include them now in the 0.8.2 release, rather than later. >> > >> >I'd like to discuss mainly two things in this thread. >> >1. Do people feel that the proposed api changes are reasonable? >> >2. Are there any concerns of including the api changes in the 0.8.2 >>final >> >release? >> > >> >Thanks, >> > >> >Jun >> >>