Joel, With a byte array interface, of course there is nothing that one can't do. However, the real question is that whether we want to encourage people to use it this way or not. Being able to flow just bytes is definitely easier to get started. That's why many early adopters choose to do it that way. However, it's often the case that they start feeling the pain later when some producers change the data format. Their Hive/Pig queries start to break and it's a painful process to have the issue fixed. So, the purpose of this api change is really to encourage people to standardize on a single serializer/deserializer that supports things like data validation and schema evolution upstream in the producer. Now, suppose there is an Avro serializer/deserializer implementation. How do we make it easy for people to adopt? If the serializer is part of the api, we can just say, wire in the Avro serializer for key and/or value in the config and then you can start sending Avro records to the producer. If the serializer is not part of the api, we have to say, first instantiate a key and/or value serializer this way, send the key to the key serializer to get the key bytes, send the value to the value serializer to get the value bytes, and finally send the bytes to the producer. The former will be simpler and likely makes the adoption easier.
Thanks, Jun On Mon, Dec 15, 2014 at 7:20 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > > Documentation is inevitable even if the serializer/deserializer is > part of the API - since the user has to set it up in the configs. So > again, you can only encourage people to use it through documentation. > The simpler byte-oriented API seems clearer to me because anyone who > needs to send (or receive) a specific data type will _be forced to_ > (or actually, _intuitively_) select a serializer (or deserializer) and > will definitely pick an already available implementation if a good one > already exists. > > Sorry I still don't get it and this is really the only sticking point > for me, albeit a minor one (which is why I have been +0 all along on > the change). I (and I think many others) would appreciate it if > someone can help me understand this better. So I will repeat the > question: What "usage pattern" cannot be supported by easily by the > simpler API without adding burden on the user? > > Thanks, > > Joel > > On Mon, Dec 15, 2014 at 11:59:34AM -0800, Jun Rao wrote: > > Joel, > > > > It's just that if the serializer/deserializer is not part of the API, you > > can only encourage people to use it through documentation. However, not > > everyone will read the documentation if it's not directly used in the > API. > > > > Thanks, > > > > Jun > > > > On Mon, Dec 15, 2014 at 2:11 AM, Joel Koshy <jjkosh...@gmail.com> wrote: > > > > > (sorry about the late follow-up late - I'm traveling most of this > > > month) > > > > > > I'm likely missing something obvious, but I find the following to be a > > > somewhat vague point that has been mentioned more than once in this > > > thread without a clear explanation. i.e., why is it hard to share a > > > serializer/deserializer implementation and just have the clients call > > > it before a send/receive? What "usage pattern" cannot be supported by > > > the simpler API? > > > > > > > 1. Can we keep the serialization semantics outside the Producer > interface > > > > and have simple bytes in / bytes out for the interface (This is what > we > > > > have today). > > > > > > > > The points for this is to keep the interface simple and usage easy to > > > > understand. The points against this is that it gets hard to share > common > > > > usage patterns around serialization/message validations for the > future. > > > > > > > > > On Tue, Dec 09, 2014 at 03:51:08AM +0000, Sriram Subramanian wrote: > > > > Thank you Jay. I agree with the issue that you point w.r.t paired > > > > serializers. I also think having mix serialization types is rare. To > get > > > > the current behavior, one can simply use a ByteArraySerializer. This > is > > > > best understood by talking with many customers and you seem to have > done > > > > that. I am convinced about the change. > > > > > > > > For the rest who gave -1 or 0 for this proposal, does the answers > for the > > > > three points(updated) below seem reasonable? Are these explanations > > > > convincing? > > > > > > > > > > > > 1. Can we keep the serialization semantics outside the Producer > interface > > > > and have simple bytes in / bytes out for the interface (This is what > we > > > > have today). > > > > > > > > The points for this is to keep the interface simple and usage easy to > > > > understand. The points against this is that it gets hard to share > common > > > > usage patterns around serialization/message validations for the > future. > > > > > > > > 2. Can we create a wrapper producer that does the serialization and > have > > > > different variants of it for different data formats? > > > > > > > > The points for this is again to keep the main API clean. The points > > > > against this is that it duplicates the API, increases the surface > area > > > and > > > > creates redundancy for a minor addition. > > > > > > > > 3. Do we need to support different data types per record? The current > > > > interface (bytes in/bytes out) lets you instantiate one producer and > use > > > > it to send multiple data formats. There seems to be some valid use > cases > > > > for this. > > > > > > > > > > > > Mixed serialization types are rare based on interactions with > customers. > > > > To get the current behavior, one can simply use a > ByteArraySerializer. > > > > > > > > On 12/5/14 5:00 PM, "Jay Kreps" <j...@confluent.io> wrote: > > > > > > > > >Hey Sriram, > > > > > > > > > >Thanks! I think this is a very helpful summary. > > > > > > > > > >Let me try to address your point about passing in the serde at send > > > time. > > > > > > > > > >I think the first objection is really to the paired key/value > serializer > > > > >interfaces. This leads to kind of a weird combinatorial thing where > you > > > > >would have an avro/avro serializer a string/avro serializer, a pb/pb > > > > >serializer, and a string/pb serializer, and so on. But your proposal > > > would > > > > >work as well with separate serializers for key and value. > > > > > > > > > >I think the downside is just the one you call out--that this is a > corner > > > > >case and you end up with two versions of all the apis to support it. > > > This > > > > >also makes the serializer api more annoying to implement. I think > the > > > > >alternative solution to this case and any other we can give people > is > > > just > > > > >configuring ByteArraySerializer which gives you basically the api > that > > > you > > > > >have now with byte arrays. If this is incredibly common then this > would > > > be > > > > >a silly solution, but I guess the belief is that these cases are > rare > > > and > > > > >a > > > > >really well implemented avro or json serializer should be 100% of > what > > > > >most > > > > >people need. > > > > > > > > > >In practice the cases that actually mix serialization types in a > single > > > > >stream are pretty rare I think just because the consumer then has > the > > > > >problem of guessing how to deserialize, so most of these will end up > > > with > > > > >at least some marker or schema id or whatever that tells you how to > read > > > > >the data. Arguable this mixed serialization with marker is itself a > > > > >serializer type and should have a serializer of its own... > > > > > > > > > >-Jay > > > > > > > > > >On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian < > > > > >srsubraman...@linkedin.com.invalid> wrote: > > > > > > > > > >> This thread has diverged multiple times now and it would be worth > > > > >> summarizing them. > > > > >> > > > > >> There seems to be the following points of discussion - > > > > >> > > > > >> 1. Can we keep the serialization semantics outside the Producer > > > > >>interface > > > > >> and have simple bytes in / bytes out for the interface (This is > what > > > we > > > > >> have today). > > > > >> > > > > >> The points for this is to keep the interface simple and usage > easy to > > > > >> understand. The points against this is that it gets hard to share > > > common > > > > >> usage patterns around serialization/message validations for the > > > future. > > > > >> > > > > >> 2. Can we create a wrapper producer that does the serialization > and > > > have > > > > >> different variants of it for different data formats? > > > > >> > > > > >> The points for this is again to keep the main API clean. The > points > > > > >> against this is that it duplicates the API, increases the surface > area > > > > >>and > > > > >> creates redundancy for a minor addition. > > > > >> > > > > >> 3. Do we need to support different data types per record? The > current > > > > >> interface (bytes in/bytes out) lets you instantiate one producer > and > > > use > > > > >> it to send multiple data formats. There seems to be some valid use > > > cases > > > > >> for this. > > > > >> > > > > >> I have still not seen a strong argument against not having this > > > > >> functionality. Can someone provide their views on why we don't > need > > > this > > > > >> support that is possible with the current API? > > > > >> > > > > >> One possible approach for the per record serialization would be to > > > > >>define > > > > >> > > > > >> public interface SerDe<K,V> { > > > > >> public byte[] serializeKey(); > > > > >> > > > > >> public K deserializeKey(); > > > > >> > > > > >> public byte[] serializeValue(); > > > > >> > > > > >> public V deserializeValue(); > > > > >> } > > > > >> > > > > >> This would be used by both the Producer and the Consumer. > > > > >> > > > > >> The send APIs can then be > > > > >> > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record); > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record, > > > Callback > > > > >> callback); > > > > >> > > > > >> > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record, > > > > >>SerDe<K,V> > > > > >> serde); > > > > >> > > > > >> public Future<RecordMetadata> send(ProducerRecord<K,V> record, > > > > >>SerDe<K,V> > > > > >> serde, Callback callback); > > > > >> > > > > >> > > > > >> A default SerDe can be set in the config. The producer would use > the > > > > >> default from the config if the non-serde send APIs are used. The > > > > >>downside > > > > >> to this approach is that we would need to have four variants of > Send > > > API > > > > >> for the Producer. > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> On 12/5/14 3:16 PM, "Jun Rao" <j...@confluent.io> wrote: > > > > >> > > > > >> >Jiangjie, > > > > >> > > > > > >> >The issue with adding the serializer in ProducerRecord is that > you > > > > >>need to > > > > >> >implement all combinations of serializers for key and value. So, > > > > >>instead > > > > >> >of > > > > >> >just implementing int and string serializers, you will have to > > > > >>implement > > > > >> >all 4 combinations. > > > > >> > > > > > >> >Adding a new producer constructor like Producer<K, > > > V>(KeySerializer<K>, > > > > >> >ValueSerializer<V>, Properties properties) can be useful. > > > > >> > > > > > >> >Thanks, > > > > >> > > > > > >> >Jun > > > > >> > > > > > >> >On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin > > > > >><j...@linkedin.com.invalid> > > > > >> >wrote: > > > > >> > > > > > >> >> > > > > >> >> I'm just thinking instead of binding serialization with > producer, > > > > >> >>another > > > > >> >> option is to bind serializer/deserializer with > > > > >> >> ProducerRecord/ConsumerRecord (please see the detail proposal > > > below.) > > > > >> >> The arguments for this option is: > > > > >> >> A. A single producer could send different message > types. > > > > >>There > > > > >> >>are > > > > >> >> several use cases in LinkedIn for per record serializer > > > > >> >> - In Samza, there are some in-stream order-sensitive > > > control > > > > >> >> messages > > > > >> >> having different deserializer from other messages. > > > > >> >> - There are use cases which need support for sending > both > > > > >>Avro > > > > >> >> messages > > > > >> >> and raw bytes. > > > > >> >> - Some use cases needs to deserialize some Avro > messages > > > into > > > > >> >> generic > > > > >> >> record and some other messages into specific record. > > > > >> >> B. In current proposal, the serializer/deserilizer is > > > > >> >>instantiated > > > > >> >> according to config. Compared with that, binding serializer > with > > > > >> >> ProducerRecord and ConsumerRecord is less error prone. > > > > >> >> > > > > >> >> > > > > >> >> This option includes the following changes: > > > > >> >> A. Add serializer and deserializer interfaces to > replace > > > > >> >>serializer > > > > >> >> instance from config. > > > > >> >> Public interface Serializer <K, V> { > > > > >> >> public byte[] serializeKey(K key); > > > > >> >> public byte[] serializeValue(V value); > > > > >> >> } > > > > >> >> Public interface deserializer <K, V> { > > > > >> >> Public K deserializeKey(byte[] key); > > > > >> >> public V deserializeValue(byte[] > value); > > > > >> >> } > > > > >> >> > > > > >> >> B. Make ProducerRecord and ConsumerRecord abstract > class > > > > >> >> implementing > > > > >> >> Serializer <K, V> and Deserializer <K, V> respectively. > > > > >> >> Public abstract class ProducerRecord <K, V> > > > > >>implements > > > > >> >> Serializer <K, V> > > > > >> >> {...} > > > > >> >> Public abstract class ConsumerRecord <K, V> > > > > >>implements > > > > >> >> Deserializer <K, > > > > >> >> V> {...} > > > > >> >> > > > > >> >> C. Instead of instantiate the serializer/Deserializer > from > > > > >> >>config, > > > > >> >> let > > > > >> >> concrete ProducerRecord/ConsumerRecord extends the abstract > class > > > and > > > > >> >> override the serialize/deserialize methods. > > > > >> >> > > > > >> >> Public class AvroProducerRecord extends > > > > >>ProducerRecord > > > > >> >> <String, > > > > >> >> GenericRecord> { > > > > >> >> ... > > > > >> >> @Override > > > > >> >> Public byte[] serializeKey(String key) > {Š} > > > > >> >> @Override > > > > >> >> public byte[] > serializeValue(GenericRecord > > > > >> >>value); > > > > >> >> } > > > > >> >> > > > > >> >> Public class AvroConsumerRecord extends > > > > >>ConsumerRecord > > > > >> >> <String, > > > > >> >> GenericRecord> { > > > > >> >> ... > > > > >> >> @Override > > > > >> >> Public K deserializeKey(byte[] key) {Š} > > > > >> >> @Override > > > > >> >> public V deserializeValue(byte[] > value); > > > > >> >> } > > > > >> >> > > > > >> >> D. The producer API changes to > > > > >> >> Public class KafkaProducer { > > > > >> >> ... > > > > >> >> > > > > >> >> Future<RecordMetadata> send > (ProducerRecord > > > > >><K, > > > > >> >>V> > > > > >> >> record) { > > > > >> >> ... > > > > >> >> K key = > > > > >>record.serializeKey(record.key); > > > > >> >> V value = > > > > >> >> record.serializedValue(record.value); > > > > >> >> BytesProducerRecord > > > > >>bytesProducerRecord > > > > >> >>= > > > > >> >> new > > > > >> >> BytesProducerRecord(topic, partition, key, value); > > > > >> >> ... > > > > >> >> } > > > > >> >> ... > > > > >> >> } > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> We also had some brainstorm in LinkedIn and here are the > feedbacks: > > > > >> >> > > > > >> >> If the community decide to add the serialization back to new > > > > >>producer, > > > > >> >> besides current proposal which changes new producer API to be a > > > > >> >>template, > > > > >> >> there are some other options raised during our discussion: > > > > >> >> 1) Rather than change current new producer API, we can > > > > >>provide a > > > > >> >> wrapper > > > > >> >> of current new producer (e.g. KafkaSerializedProducer) and > make it > > > > >> >> available to users. As there is value in the simplicity of > current > > > > >>API. > > > > >> >> > > > > >> >> 2) If we decide to go with tempalated new producer API, > > > > >> >>according > > > > >> >> to > > > > >> >> experience in LinkedIn, it might worth considering to > instantiate > > > the > > > > >> >> serializer in code instead of from config so we can avoid > runtime > > > > >>errors > > > > >> >> due to dynamic instantiation from config, which is more error > > > prone. > > > > >>If > > > > >> >> that is the case, the producer API could be changed to > something > > > > >>like: > > > > >> >> producer = new Producer<K, V>(KeySerializer<K>, > > > > >> >> ValueSerializer<V>) > > > > >> >> > > > > >> >> --Jiangjie (Becket) Qin > > > > >> >> > > > > >> >> > > > > >> >> On 11/24/14, 5:58 PM, "Jun Rao" <jun...@gmail.com> wrote: > > > > >> >> > > > > >> >> >Hi, Everyone, > > > > >> >> > > > > > >> >> >I'd like to start a discussion on whether it makes sense to > add > > > the > > > > >> >> >serializer api back to the new java producer. Currently, the > new > > > > >>java > > > > >> >> >producer takes a byte array for both the key and the value. > While > > > > >>this > > > > >> >>api > > > > >> >> >is simple, it pushes the serialization logic into the > application. > > > > >>This > > > > >> >> >makes it hard to reason about what type of data is being sent > to > > > > >>Kafka > > > > >> >>and > > > > >> >> >also makes it hard to share an implementation of the > serializer. > > > For > > > > >> >> >example, to support Avro, the serialization logic could be > quite > > > > >> >>involved > > > > >> >> >since it might need to register the Avro schema in some remote > > > > >>registry > > > > >> >> >and > > > > >> >> >maintain a schema cache locally, etc. Without a serialization > api, > > > > >>it's > > > > >> >> >impossible to share such an implementation so that people can > > > easily > > > > >> >> >reuse. > > > > >> >> >We sort of overlooked this implication during the initial > > > > >>discussion of > > > > >> >> >the > > > > >> >> >producer api. > > > > >> >> > > > > > >> >> >So, I'd like to propose an api change to the new producer by > > > adding > > > > >> >>back > > > > >> >> >the serializer api similar to what we had in the old producer. > > > > >> >>Specially, > > > > >> >> >the proposed api changes are the following. > > > > >> >> > > > > > >> >> >First, we change KafkaProducer to take generic types K and V > for > > > the > > > > >> >>key > > > > >> >> >and the value, respectively. > > > > >> >> > > > > > >> >> >public class KafkaProducer<K,V> implements Producer<K,V> { > > > > >> >> > > > > > >> >> > public Future<RecordMetadata> send(ProducerRecord<K,V> > record, > > > > >> >> >Callback > > > > >> >> >callback); > > > > >> >> > > > > > >> >> > public Future<RecordMetadata> send(ProducerRecord<K,V> > > > record); > > > > >> >> >} > > > > >> >> > > > > > >> >> >Second, we add two new configs, one for the key serializer and > > > > >>another > > > > >> >>for > > > > >> >> >the value serializer. Both serializers will default to the > byte > > > > >>array > > > > >> >> >implementation. > > > > >> >> > > > > > >> >> >public class ProducerConfig extends AbstractConfig { > > > > >> >> > > > > > >> >> > .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, > > > > >> >> >"org.apache.kafka.clients.producer.ByteArraySerializer", > > > > >> >>Importance.HIGH, > > > > >> >> >KEY_SERIALIZER_CLASS_DOC) > > > > >> >> > .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, > > > > >> >> >"org.apache.kafka.clients.producer.ByteArraySerializer", > > > > >> >>Importance.HIGH, > > > > >> >> >VALUE_SERIALIZER_CLASS_DOC); > > > > >> >> >} > > > > >> >> > > > > > >> >> >Both serializers will implement the following interface. > > > > >> >> > > > > > >> >> >public interface Serializer<T> extends Configurable { > > > > >> >> > public byte[] serialize(String topic, T data, boolean > isKey); > > > > >> >> > > > > > >> >> > public void close(); > > > > >> >> >} > > > > >> >> > > > > > >> >> >This is more or less the same as what's in the old producer. > The > > > > >>slight > > > > >> >> >differences are (1) the serializer now only requires a > > > > >>parameter-less > > > > >> >> >constructor; (2) the serializer has a configure() and a > close() > > > > >>method > > > > >> >>for > > > > >> >> >initialization and cleanup, respectively; (3) the serialize() > > > method > > > > >> >> >additionally takes the topic and an isKey indicator, both of > which > > > > >>are > > > > >> >> >useful for things like schema registration. > > > > >> >> > > > > > >> >> >The detailed changes are included in KAFKA-1797. For > > > completeness, I > > > > >> >>also > > > > >> >> >made the corresponding changes for the new java consumer api > as > > > > >>well. > > > > >> >> > > > > > >> >> >Note that the proposed api changes are incompatible with > what's in > > > > >>the > > > > >> >> >0.8.2 branch. However, if those api changes are beneficial, > it's > > > > >> >>probably > > > > >> >> >better to include them now in the 0.8.2 release, rather than > > > later. > > > > >> >> > > > > > >> >> >I'd like to discuss mainly two things in this thread. > > > > >> >> >1. Do people feel that the proposed api changes are > reasonable? > > > > >> >> >2. Are there any concerns of including the api changes in the > > > 0.8.2 > > > > >> >>final > > > > >> >> >release? > > > > >> >> > > > > > >> >> >Thanks, > > > > >> >> > > > > > >> >> >Jun > > > > >> >> > > > > >> >> > > > > >> > > > > >> > > > > > > > > > > > >