(sorry about the late follow-up late - I'm traveling most of this month) I'm likely missing something obvious, but I find the following to be a somewhat vague point that has been mentioned more than once in this thread without a clear explanation. i.e., why is it hard to share a serializer/deserializer implementation and just have the clients call it before a send/receive? What "usage pattern" cannot be supported by the simpler API?
> 1. Can we keep the serialization semantics outside the Producer interface > and have simple bytes in / bytes out for the interface (This is what we > have today). > > The points for this is to keep the interface simple and usage easy to > understand. The points against this is that it gets hard to share common > usage patterns around serialization/message validations for the future. On Tue, Dec 09, 2014 at 03:51:08AM +0000, Sriram Subramanian wrote: > Thank you Jay. I agree with the issue that you point w.r.t paired > serializers. I also think having mix serialization types is rare. To get > the current behavior, one can simply use a ByteArraySerializer. This is > best understood by talking with many customers and you seem to have done > that. I am convinced about the change. > > For the rest who gave -1 or 0 for this proposal, does the answers for the > three points(updated) below seem reasonable? Are these explanations > convincing? > > > 1. Can we keep the serialization semantics outside the Producer interface > and have simple bytes in / bytes out for the interface (This is what we > have today). > > The points for this is to keep the interface simple and usage easy to > understand. The points against this is that it gets hard to share common > usage patterns around serialization/message validations for the future. > > 2. Can we create a wrapper producer that does the serialization and have > different variants of it for different data formats? > > The points for this is again to keep the main API clean. The points > against this is that it duplicates the API, increases the surface area and > creates redundancy for a minor addition. > > 3. Do we need to support different data types per record? The current > interface (bytes in/bytes out) lets you instantiate one producer and use > it to send multiple data formats. There seems to be some valid use cases > for this. > > > Mixed serialization types are rare based on interactions with customers. > To get the current behavior, one can simply use a ByteArraySerializer. > > On 12/5/14 5:00 PM, "Jay Kreps" <j...@confluent.io> wrote: > > >Hey Sriram, > > > >Thanks! I think this is a very helpful summary. > > > >Let me try to address your point about passing in the serde at send time. > > > >I think the first objection is really to the paired key/value serializer > >interfaces. This leads to kind of a weird combinatorial thing where you > >would have an avro/avro serializer a string/avro serializer, a pb/pb > >serializer, and a string/pb serializer, and so on. But your proposal would > >work as well with separate serializers for key and value. > > > >I think the downside is just the one you call out--that this is a corner > >case and you end up with two versions of all the apis to support it. This > >also makes the serializer api more annoying to implement. I think the > >alternative solution to this case and any other we can give people is just > >configuring ByteArraySerializer which gives you basically the api that you > >have now with byte arrays. If this is incredibly common then this would be > >a silly solution, but I guess the belief is that these cases are rare and > >a > >really well implemented avro or json serializer should be 100% of what > >most > >people need. > > > >In practice the cases that actually mix serialization types in a single > >stream are pretty rare I think just because the consumer then has the > >problem of guessing how to deserialize, so most of these will end up with > >at least some marker or schema id or whatever that tells you how to read > >the data. Arguable this mixed serialization with marker is itself a > >serializer type and should have a serializer of its own... > > > >-Jay > > > >On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian < > >srsubraman...@linkedin.com.invalid> wrote: > > > >> This thread has diverged multiple times now and it would be worth > >> summarizing them. > >> > >> There seems to be the following points of discussion - > >> > >> 1. Can we keep the serialization semantics outside the Producer > >>interface > >> and have simple bytes in / bytes out for the interface (This is what we > >> have today). > >> > >> The points for this is to keep the interface simple and usage easy to > >> understand. The points against this is that it gets hard to share common > >> usage patterns around serialization/message validations for the future. > >> > >> 2. Can we create a wrapper producer that does the serialization and have > >> different variants of it for different data formats? > >> > >> The points for this is again to keep the main API clean. The points > >> against this is that it duplicates the API, increases the surface area > >>and > >> creates redundancy for a minor addition. > >> > >> 3. Do we need to support different data types per record? The current > >> interface (bytes in/bytes out) lets you instantiate one producer and use > >> it to send multiple data formats. There seems to be some valid use cases > >> for this. > >> > >> I have still not seen a strong argument against not having this > >> functionality. Can someone provide their views on why we don't need this > >> support that is possible with the current API? > >> > >> One possible approach for the per record serialization would be to > >>define > >> > >> public interface SerDe<K,V> { > >> public byte[] serializeKey(); > >> > >> public K deserializeKey(); > >> > >> public byte[] serializeValue(); > >> > >> public V deserializeValue(); > >> } > >> > >> This would be used by both the Producer and the Consumer. > >> > >> The send APIs can then be > >> > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record); > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback > >> callback); > >> > >> > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record, > >>SerDe<K,V> > >> serde); > >> > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record, > >>SerDe<K,V> > >> serde, Callback callback); > >> > >> > >> A default SerDe can be set in the config. The producer would use the > >> default from the config if the non-serde send APIs are used. The > >>downside > >> to this approach is that we would need to have four variants of Send API > >> for the Producer. > >> > >> > >> > >> > >> > >> > >> On 12/5/14 3:16 PM, "Jun Rao" <j...@confluent.io> wrote: > >> > >> >Jiangjie, > >> > > >> >The issue with adding the serializer in ProducerRecord is that you > >>need to > >> >implement all combinations of serializers for key and value. So, > >>instead > >> >of > >> >just implementing int and string serializers, you will have to > >>implement > >> >all 4 combinations. > >> > > >> >Adding a new producer constructor like Producer<K, V>(KeySerializer<K>, > >> >ValueSerializer<V>, Properties properties) can be useful. > >> > > >> >Thanks, > >> > > >> >Jun > >> > > >> >On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin > >><j...@linkedin.com.invalid> > >> >wrote: > >> > > >> >> > >> >> I'm just thinking instead of binding serialization with producer, > >> >>another > >> >> option is to bind serializer/deserializer with > >> >> ProducerRecord/ConsumerRecord (please see the detail proposal below.) > >> >> The arguments for this option is: > >> >> A. A single producer could send different message types. > >>There > >> >>are > >> >> several use cases in LinkedIn for per record serializer > >> >> - In Samza, there are some in-stream order-sensitive control > >> >> messages > >> >> having different deserializer from other messages. > >> >> - There are use cases which need support for sending both > >>Avro > >> >> messages > >> >> and raw bytes. > >> >> - Some use cases needs to deserialize some Avro messages into > >> >> generic > >> >> record and some other messages into specific record. > >> >> B. In current proposal, the serializer/deserilizer is > >> >>instantiated > >> >> according to config. Compared with that, binding serializer with > >> >> ProducerRecord and ConsumerRecord is less error prone. > >> >> > >> >> > >> >> This option includes the following changes: > >> >> A. Add serializer and deserializer interfaces to replace > >> >>serializer > >> >> instance from config. > >> >> Public interface Serializer <K, V> { > >> >> public byte[] serializeKey(K key); > >> >> public byte[] serializeValue(V value); > >> >> } > >> >> Public interface deserializer <K, V> { > >> >> Public K deserializeKey(byte[] key); > >> >> public V deserializeValue(byte[] value); > >> >> } > >> >> > >> >> B. Make ProducerRecord and ConsumerRecord abstract class > >> >> implementing > >> >> Serializer <K, V> and Deserializer <K, V> respectively. > >> >> Public abstract class ProducerRecord <K, V> > >>implements > >> >> Serializer <K, V> > >> >> {...} > >> >> Public abstract class ConsumerRecord <K, V> > >>implements > >> >> Deserializer <K, > >> >> V> {...} > >> >> > >> >> C. Instead of instantiate the serializer/Deserializer from > >> >>config, > >> >> let > >> >> concrete ProducerRecord/ConsumerRecord extends the abstract class and > >> >> override the serialize/deserialize methods. > >> >> > >> >> Public class AvroProducerRecord extends > >>ProducerRecord > >> >> <String, > >> >> GenericRecord> { > >> >> ... > >> >> @Override > >> >> Public byte[] serializeKey(String key) {Š} > >> >> @Override > >> >> public byte[] serializeValue(GenericRecord > >> >>value); > >> >> } > >> >> > >> >> Public class AvroConsumerRecord extends > >>ConsumerRecord > >> >> <String, > >> >> GenericRecord> { > >> >> ... > >> >> @Override > >> >> Public K deserializeKey(byte[] key) {Š} > >> >> @Override > >> >> public V deserializeValue(byte[] value); > >> >> } > >> >> > >> >> D. The producer API changes to > >> >> Public class KafkaProducer { > >> >> ... > >> >> > >> >> Future<RecordMetadata> send (ProducerRecord > >><K, > >> >>V> > >> >> record) { > >> >> ... > >> >> K key = > >>record.serializeKey(record.key); > >> >> V value = > >> >> record.serializedValue(record.value); > >> >> BytesProducerRecord > >>bytesProducerRecord > >> >>= > >> >> new > >> >> BytesProducerRecord(topic, partition, key, value); > >> >> ... > >> >> } > >> >> ... > >> >> } > >> >> > >> >> > >> >> > >> >> We also had some brainstorm in LinkedIn and here are the feedbacks: > >> >> > >> >> If the community decide to add the serialization back to new > >>producer, > >> >> besides current proposal which changes new producer API to be a > >> >>template, > >> >> there are some other options raised during our discussion: > >> >> 1) Rather than change current new producer API, we can > >>provide a > >> >> wrapper > >> >> of current new producer (e.g. KafkaSerializedProducer) and make it > >> >> available to users. As there is value in the simplicity of current > >>API. > >> >> > >> >> 2) If we decide to go with tempalated new producer API, > >> >>according > >> >> to > >> >> experience in LinkedIn, it might worth considering to instantiate the > >> >> serializer in code instead of from config so we can avoid runtime > >>errors > >> >> due to dynamic instantiation from config, which is more error prone. > >>If > >> >> that is the case, the producer API could be changed to something > >>like: > >> >> producer = new Producer<K, V>(KeySerializer<K>, > >> >> ValueSerializer<V>) > >> >> > >> >> --Jiangjie (Becket) Qin > >> >> > >> >> > >> >> On 11/24/14, 5:58 PM, "Jun Rao" <jun...@gmail.com> wrote: > >> >> > >> >> >Hi, Everyone, > >> >> > > >> >> >I'd like to start a discussion on whether it makes sense to add the > >> >> >serializer api back to the new java producer. Currently, the new > >>java > >> >> >producer takes a byte array for both the key and the value. While > >>this > >> >>api > >> >> >is simple, it pushes the serialization logic into the application. > >>This > >> >> >makes it hard to reason about what type of data is being sent to > >>Kafka > >> >>and > >> >> >also makes it hard to share an implementation of the serializer. For > >> >> >example, to support Avro, the serialization logic could be quite > >> >>involved > >> >> >since it might need to register the Avro schema in some remote > >>registry > >> >> >and > >> >> >maintain a schema cache locally, etc. Without a serialization api, > >>it's > >> >> >impossible to share such an implementation so that people can easily > >> >> >reuse. > >> >> >We sort of overlooked this implication during the initial > >>discussion of > >> >> >the > >> >> >producer api. > >> >> > > >> >> >So, I'd like to propose an api change to the new producer by adding > >> >>back > >> >> >the serializer api similar to what we had in the old producer. > >> >>Specially, > >> >> >the proposed api changes are the following. > >> >> > > >> >> >First, we change KafkaProducer to take generic types K and V for the > >> >>key > >> >> >and the value, respectively. > >> >> > > >> >> >public class KafkaProducer<K,V> implements Producer<K,V> { > >> >> > > >> >> > public Future<RecordMetadata> send(ProducerRecord<K,V> record, > >> >> >Callback > >> >> >callback); > >> >> > > >> >> > public Future<RecordMetadata> send(ProducerRecord<K,V> record); > >> >> >} > >> >> > > >> >> >Second, we add two new configs, one for the key serializer and > >>another > >> >>for > >> >> >the value serializer. Both serializers will default to the byte > >>array > >> >> >implementation. > >> >> > > >> >> >public class ProducerConfig extends AbstractConfig { > >> >> > > >> >> > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, > >> >> >"org.apache.kafka.clients.producer.ByteArraySerializer", > >> >>Importance.HIGH, > >> >> >KEY_SERIALIZER_CLASS_DOC) > >> >> > .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, > >> >> >"org.apache.kafka.clients.producer.ByteArraySerializer", > >> >>Importance.HIGH, > >> >> >VALUE_SERIALIZER_CLASS_DOC); > >> >> >} > >> >> > > >> >> >Both serializers will implement the following interface. > >> >> > > >> >> >public interface Serializer<T> extends Configurable { > >> >> > public byte[] serialize(String topic, T data, boolean isKey); > >> >> > > >> >> > public void close(); > >> >> >} > >> >> > > >> >> >This is more or less the same as what's in the old producer. The > >>slight > >> >> >differences are (1) the serializer now only requires a > >>parameter-less > >> >> >constructor; (2) the serializer has a configure() and a close() > >>method > >> >>for > >> >> >initialization and cleanup, respectively; (3) the serialize() method > >> >> >additionally takes the topic and an isKey indicator, both of which > >>are > >> >> >useful for things like schema registration. > >> >> > > >> >> >The detailed changes are included in KAFKA-1797. For completeness, I > >> >>also > >> >> >made the corresponding changes for the new java consumer api as > >>well. > >> >> > > >> >> >Note that the proposed api changes are incompatible with what's in > >>the > >> >> >0.8.2 branch. However, if those api changes are beneficial, it's > >> >>probably > >> >> >better to include them now in the 0.8.2 release, rather than later. > >> >> > > >> >> >I'd like to discuss mainly two things in this thread. > >> >> >1. Do people feel that the proposed api changes are reasonable? > >> >> >2. Are there any concerns of including the api changes in the 0.8.2 > >> >>final > >> >> >release? > >> >> > > >> >> >Thanks, > >> >> > > >> >> >Jun > >> >> > >> >> > >> > >> >